This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 51e26d3e feat(cli): add expire-snapshots command with dry-run (#1063)
51e26d3e is described below
commit 51e26d3ef6989a4b2d03ef0e8a90756491ca0c0e
Author: Tanmay Rauth <[email protected]>
AuthorDate: Wed May 13 12:35:58 2026 -0700
feat(cli): add expire-snapshots command with dry-run (#1063)
Add `iceberg expire-snapshots TABLE_ID` with --older-than,
--retain-last, --dry-run, and --yes flags. Uses StagedTable diff for
dry-run preview.
Related: #957
Depends On: #1073
---
cmd/iceberg/expire_snapshots.go | 168 +++++++++++++++++++++++++++++++++--
cmd/iceberg/expire_snapshots_test.go | 150 +++++++++++++++++++++++++++++++
2 files changed, 312 insertions(+), 6 deletions(-)
diff --git a/cmd/iceberg/expire_snapshots.go b/cmd/iceberg/expire_snapshots.go
index 46a10d6e..852b5336 100644
--- a/cmd/iceberg/expire_snapshots.go
+++ b/cmd/iceberg/expire_snapshots.go
@@ -19,16 +19,172 @@ package main
import (
"context"
- "errors"
+ "encoding/json"
+ "fmt"
"os"
+ "strconv"
+ "time"
"github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
+ "github.com/pterm/pterm"
)
-func runExpireSnapshots(_ context.Context, output Output, _ catalog.Catalog, _
*ExpireSnapshotsCmd) {
- output.Error(errors.New("expire-snapshots: not yet implemented"))
- os.Exit(1)
+func runExpireSnapshots(ctx context.Context, output Output, cat
catalog.Catalog, cmd *ExpireSnapshotsCmd) {
+ tbl := loadTable(ctx, output, cat, cmd.TableID)
+
+ var opts []table.ExpireSnapshotsOpt
+
+ if cmd.OlderThan != "" {
+ d, err := parseDuration(cmd.OlderThan)
+ if err != nil {
+ output.Error(fmt.Errorf("invalid --older-than: %w",
err))
+ os.Exit(1)
+ }
+
+ opts = append(opts, table.WithOlderThan(d))
+ }
+
+ if cmd.RetainLast != nil {
+ opts = append(opts, table.WithRetainLast(*cmd.RetainLast))
+ }
+
+ opts = append(opts, table.WithPostCommit(false))
+
+ tx := tbl.NewTransaction()
+ if err := tx.ExpireSnapshots(opts...); err != nil {
+ output.Error(fmt.Errorf("expire snapshots failed: %w", err))
+ os.Exit(1)
+ }
+
+ staged, err := tx.StagedTable()
+ if err != nil {
+ output.Error(fmt.Errorf("staging table failed: %w", err))
+ os.Exit(1)
+ }
+
+ expired := diffSnapshots(tbl.Metadata().Snapshots(),
staged.Metadata().Snapshots())
+
+ result := ExpireSnapshotsResult{
+ DryRun: cmd.DryRun,
+ Table: tableIDString(tbl),
+ ExpiredSnapshotCount: len(expired),
+ ExpiredSnapshots: expired,
+ }
+
+ if cmd.DryRun {
+ output.ExpireSnapshotsResult(result)
+
+ return
+ }
+
+ if len(expired) == 0 {
+ output.ExpireSnapshotsResult(result)
+
+ return
+ }
+
+ prompt := fmt.Sprintf("Expire %d snapshot(s) from %s?", len(expired),
tableIDString(tbl))
+ if err := confirmAction(prompt, cmd.Yes); err != nil {
+ output.Error(err)
+ os.Exit(1)
+ }
+
+ realOpts := make([]table.ExpireSnapshotsOpt, 0, len(opts)-1)
+ for i, o := range opts {
+ if i == len(opts)-1 {
+ continue
+ }
+
+ realOpts = append(realOpts, o)
+ }
+
+ realTx := tbl.NewTransaction()
+ if err := realTx.ExpireSnapshots(realOpts...); err != nil {
+ output.Error(fmt.Errorf("expire snapshots failed: %w", err))
+ os.Exit(1)
+ }
+
+ if _, err := realTx.Commit(ctx); err != nil {
+ output.Error(fmt.Errorf("commit failed: %w", err))
+ os.Exit(1)
+ }
+
+ result.DryRun = false
+ output.ExpireSnapshotsResult(result)
+}
+
+func diffSnapshots(before, after []table.Snapshot) []SnapshotEntry {
+ afterSet := make(map[int64]struct{}, len(after))
+ for _, s := range after {
+ afterSet[s.SnapshotID] = struct{}{}
+ }
+
+ var expired []SnapshotEntry
+
+ for _, s := range before {
+ if _, ok := afterSet[s.SnapshotID]; ok {
+ continue
+ }
+
+ op := ""
+ addedFiles := "-"
+ deletedFiles := "-"
+
+ if s.Summary != nil {
+ op = string(s.Summary.Operation)
+ if v, ok := s.Summary.Properties["added-data-files"];
ok {
+ addedFiles = v
+ }
+ if v, ok := s.Summary.Properties["deleted-data-files"];
ok {
+ deletedFiles = v
+ }
+ }
+
+ expired = append(expired, SnapshotEntry{
+ SnapshotID: s.SnapshotID,
+ Timestamp:
time.UnixMilli(s.TimestampMs).UTC().Format(time.RFC3339),
+ ParentSnapshotID: s.ParentSnapshotID,
+ Operation: op,
+ AddedDataFiles: addedFiles,
+ DeletedDataFiles: deletedFiles,
+ })
+ }
+
+ return expired
}
-func (textOutput) ExpireSnapshotsResult(_ ExpireSnapshotsResult) {}
-func (jsonOutput) ExpireSnapshotsResult(_ ExpireSnapshotsResult) {}
+func (t textOutput) ExpireSnapshotsResult(result ExpireSnapshotsResult) {
+ if result.ExpiredSnapshotCount == 0 {
+ pterm.Println("No snapshots to expire.")
+
+ return
+ }
+
+ if result.DryRun {
+ pterm.Printfln("[DRY RUN] %d snapshots would be expired:",
result.ExpiredSnapshotCount)
+ } else {
+ pterm.Printfln("Expired %d snapshots from %s.",
result.ExpiredSnapshotCount, result.Table)
+ }
+
+ data := pterm.TableData{{"SNAPSHOT ID", "TIMESTAMP", "OP"}}
+
+ for _, e := range result.ExpiredSnapshots {
+ data = append(data, []string{
+ strconv.FormatInt(e.SnapshotID, 10),
+ e.Timestamp,
+ e.Operation,
+ })
+ }
+
+ pterm.DefaultTable.
+ WithHasHeader(true).
+ WithHeaderRowSeparator("-").
+ WithData(data).Render()
+}
+
+func (j jsonOutput) ExpireSnapshotsResult(result ExpireSnapshotsResult) {
+ if err := json.NewEncoder(os.Stdout).Encode(result); err != nil {
+ j.Error(err)
+ }
+}
diff --git a/cmd/iceberg/expire_snapshots_test.go
b/cmd/iceberg/expire_snapshots_test.go
new file mode 100644
index 00000000..70f3d5f3
--- /dev/null
+++ b/cmd/iceberg/expire_snapshots_test.go
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "bytes"
+ "os"
+ "testing"
+
+ "github.com/apache/iceberg-go/table"
+ "github.com/pterm/pterm"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestDiffSnapshots(t *testing.T) {
+ before := []table.Snapshot{
+ {SnapshotID: 1, TimestampMs: 1000000000000, Summary:
&table.Summary{Operation: table.OpAppend}},
+ {SnapshotID: 2, TimestampMs: 2000000000000, Summary:
&table.Summary{Operation: table.OpOverwrite}},
+ {SnapshotID: 3, TimestampMs: 3000000000000, Summary:
&table.Summary{Operation: table.OpAppend}},
+ }
+ after := []table.Snapshot{
+ {SnapshotID: 3, TimestampMs: 3000000000000, Summary:
&table.Summary{Operation: table.OpAppend}},
+ }
+
+ expired := diffSnapshots(before, after)
+
+ require.Len(t, expired, 2)
+ assert.Equal(t, int64(1), expired[0].SnapshotID)
+ assert.Equal(t, "append", expired[0].Operation)
+ assert.Equal(t, int64(2), expired[1].SnapshotID)
+ assert.Equal(t, "overwrite", expired[1].Operation)
+}
+
+func TestDiffSnapshotsNoneExpired(t *testing.T) {
+ snaps := []table.Snapshot{
+ {SnapshotID: 1, TimestampMs: 1000000000000, Summary:
&table.Summary{Operation: table.OpAppend}},
+ }
+
+ expired := diffSnapshots(snaps, snaps)
+ assert.Empty(t, expired)
+}
+
+func TestTextOutputExpireSnapshotsResultDryRun(t *testing.T) {
+ var buf bytes.Buffer
+ pterm.SetDefaultOutput(&buf)
+ pterm.DisableColor()
+
+ result := ExpireSnapshotsResult{
+ DryRun: true,
+ Table: "db.events",
+ ExpiredSnapshotCount: 2,
+ ExpiredSnapshots: []SnapshotEntry{
+ {SnapshotID: 100, Timestamp: "2024-01-01T00:00:00Z",
Operation: "append"},
+ {SnapshotID: 200, Timestamp: "2024-01-02T00:00:00Z",
Operation: "overwrite"},
+ },
+ }
+
+ buf.Reset()
+ textOutput{}.ExpireSnapshotsResult(result)
+
+ output := buf.String()
+ assert.Contains(t, output, "[DRY RUN]")
+ assert.Contains(t, output, "2 snapshots would be expired")
+ assert.Contains(t, output, "100")
+ assert.Contains(t, output, "200")
+}
+
+func TestTextOutputExpireSnapshotsResultCommitted(t *testing.T) {
+ var buf bytes.Buffer
+ pterm.SetDefaultOutput(&buf)
+ pterm.DisableColor()
+
+ result := ExpireSnapshotsResult{
+ DryRun: false,
+ Table: "db.events",
+ ExpiredSnapshotCount: 1,
+ ExpiredSnapshots: []SnapshotEntry{
+ {SnapshotID: 100, Timestamp: "2024-01-01T00:00:00Z",
Operation: "append"},
+ },
+ }
+
+ buf.Reset()
+ textOutput{}.ExpireSnapshotsResult(result)
+
+ output := buf.String()
+ assert.Contains(t, output, "Expired 1 snapshots from db.events.")
+ assert.Contains(t, output, "100")
+}
+
+func TestTextOutputExpireSnapshotsResultEmpty(t *testing.T) {
+ var buf bytes.Buffer
+ pterm.SetDefaultOutput(&buf)
+ pterm.DisableColor()
+
+ result := ExpireSnapshotsResult{
+ DryRun: false,
+ Table: "db.events",
+ ExpiredSnapshotCount: 0,
+ ExpiredSnapshots: nil,
+ }
+
+ buf.Reset()
+ textOutput{}.ExpireSnapshotsResult(result)
+
+ assert.Contains(t, buf.String(), "No snapshots to expire.")
+}
+
+func TestJSONOutputExpireSnapshotsResult(t *testing.T) {
+ oldStdout := os.Stdout
+ r, w, _ := os.Pipe()
+ os.Stdout = w
+ defer func() { os.Stdout = oldStdout }()
+
+ result := ExpireSnapshotsResult{
+ DryRun: true,
+ Table: "db.events",
+ ExpiredSnapshotCount: 1,
+ ExpiredSnapshots: []SnapshotEntry{
+ {SnapshotID: 100, Timestamp: "2024-01-01T00:00:00Z",
Operation: "append"},
+ },
+ }
+
+ jsonOutput{}.ExpireSnapshotsResult(result)
+
+ w.Close()
+ var buf bytes.Buffer
+ _, _ = buf.ReadFrom(r)
+
+ output := buf.String()
+ assert.Contains(t, output, `"dry_run":true`)
+ assert.Contains(t, output, `"table":"db.events"`)
+ assert.Contains(t, output, `"expired_snapshot_count":1`)
+ assert.Contains(t, output, `"snapshot_id":100`)
+}