This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 14470e5b feat(cli): add clean-orphan-files command (#1066)
14470e5b is described below
commit 14470e5b24fecdd7867e6bb9724762268f29c06b
Author: Tanmay Rauth <[email protected]>
AuthorDate: Wed May 13 12:36:24 2026 -0700
feat(cli): add clean-orphan-files command (#1066)
Add `iceberg clean-orphan-files TABLE_ID` with --older-than (default
72h), --dry-run, --yes, and --location flags.
Related: #957
Depends On: #1073
---
cmd/iceberg/clean_orphan_files.go | 128 ++++++++++++++++++++--
cmd/iceberg/clean_orphan_files_test.go | 188 +++++++++++++++++++++++++++++++++
2 files changed, 310 insertions(+), 6 deletions(-)
diff --git a/cmd/iceberg/clean_orphan_files.go
b/cmd/iceberg/clean_orphan_files.go
index 98297bb4..e55e7c4c 100644
--- a/cmd/iceberg/clean_orphan_files.go
+++ b/cmd/iceberg/clean_orphan_files.go
@@ -19,16 +19,132 @@ package main
import (
"context"
- "errors"
+ "encoding/json"
+ "fmt"
"os"
+ "strconv"
"github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
+ "github.com/pterm/pterm"
)
-func runCleanOrphanFiles(_ context.Context, output Output, _ catalog.Catalog,
_ *CleanOrphanFilesCmd) {
- output.Error(errors.New("clean-orphan-files: not yet implemented"))
- os.Exit(1)
+func runCleanOrphanFiles(ctx context.Context, output Output, cat
catalog.Catalog, cmd *CleanOrphanFilesCmd) {
+ olderThan, err := parseDuration(cmd.OlderThan)
+ if err != nil {
+ output.Error(fmt.Errorf("invalid --older-than: %w", err))
+ os.Exit(1)
+ }
+
+ tbl := loadTable(ctx, output, cat, cmd.TableID)
+
+ opts := []table.OrphanCleanupOption{
+ table.WithFilesOlderThan(olderThan),
+ table.WithDryRun(true),
+ }
+
+ if cmd.Location != "" {
+ opts = append(opts, table.WithLocation(cmd.Location))
+ }
+
+ result, err := tbl.DeleteOrphanFiles(ctx, opts...)
+ if err != nil {
+ output.Error(fmt.Errorf("orphan file scan failed: %w", err))
+ os.Exit(1)
+ }
+
+ cliResult := buildCleanOrphanFilesResult(tbl, result, cmd.DryRun)
+
+ if cmd.DryRun {
+ output.CleanOrphanFilesResult(cliResult)
+
+ return
+ }
+
+ if len(result.OrphanFileLocations) == 0 {
+ output.CleanOrphanFilesResult(cliResult)
+
+ return
+ }
+
+ prompt := fmt.Sprintf("Delete %d orphan file(s) (%s) from %s?",
+ len(result.OrphanFileLocations),
formatBytes(result.TotalSizeBytes), tableIDString(tbl))
+ if err := confirmAction(prompt, cmd.Yes); err != nil {
+ output.Error(err)
+ os.Exit(1)
+ }
+
+ deleteOpts := []table.OrphanCleanupOption{
+ table.WithFilesOlderThan(olderThan),
+ table.WithDryRun(false),
+ }
+
+ if cmd.Location != "" {
+ deleteOpts = append(deleteOpts,
table.WithLocation(cmd.Location))
+ }
+
+ deleteResult, err := tbl.DeleteOrphanFiles(ctx, deleteOpts...)
+ if err != nil {
+ output.Error(fmt.Errorf("orphan file deletion failed: %w", err))
+ os.Exit(1)
+ }
+
+ cliResult = buildCleanOrphanFilesResult(tbl, deleteResult, false)
+ output.CleanOrphanFilesResult(cliResult)
}
-func (textOutput) CleanOrphanFilesResult(_ CleanOrphanFilesResult) {}
-func (jsonOutput) CleanOrphanFilesResult(_ CleanOrphanFilesResult) {}
+func buildCleanOrphanFilesResult(tbl *table.Table, result
table.OrphanCleanupResult, dryRun bool) CleanOrphanFilesResult {
+ files := result.OrphanFileLocations
+ if !dryRun {
+ files = result.DeletedFiles
+ }
+
+ entries := make([]OrphanFileEntry, 0, len(files))
+ for _, f := range files {
+ entries = append(entries, OrphanFileEntry{Path: f})
+ }
+
+ return CleanOrphanFilesResult{
+ DryRun: dryRun,
+ Table: tableIDString(tbl),
+ OrphanFileCount: len(entries),
+ TotalSizeBytes: result.TotalSizeBytes,
+ OrphanFiles: entries,
+ }
+}
+
+func (t textOutput) CleanOrphanFilesResult(result CleanOrphanFilesResult) {
+ if result.OrphanFileCount == 0 {
+ pterm.Println("No orphan files found.")
+
+ return
+ }
+
+ sizeStr := formatBytes(result.TotalSizeBytes)
+
+ if result.DryRun {
+ pterm.Printfln("[DRY RUN] %d orphan files found (%s):",
result.OrphanFileCount, sizeStr)
+ } else {
+ pterm.Printfln("Deleted %d orphan files (%s) from %s.",
result.OrphanFileCount, sizeStr, result.Table)
+ }
+
+ data := pterm.TableData{{"#", "PATH"}}
+
+ for i, f := range result.OrphanFiles {
+ data = append(data, []string{
+ strconv.Itoa(i + 1),
+ f.Path,
+ })
+ }
+
+ pterm.DefaultTable.
+ WithHasHeader(true).
+ WithHeaderRowSeparator("-").
+ WithData(data).Render()
+}
+
+func (j jsonOutput) CleanOrphanFilesResult(result CleanOrphanFilesResult) {
+ if err := json.NewEncoder(os.Stdout).Encode(result); err != nil {
+ j.Error(err)
+ }
+}
diff --git a/cmd/iceberg/clean_orphan_files_test.go
b/cmd/iceberg/clean_orphan_files_test.go
new file mode 100644
index 00000000..490dc033
--- /dev/null
+++ b/cmd/iceberg/clean_orphan_files_test.go
@@ -0,0 +1,188 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "bytes"
+ "os"
+ "testing"
+
+ "github.com/apache/iceberg-go/table"
+ "github.com/pterm/pterm"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+func TestBuildCleanOrphanFilesResultDryRun(t *testing.T) {
+ const metadata = `{
+ "format-version": 2,
+ "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+ "location": "s3://bucket/test/location",
+ "last-sequence-number": 0,
+ "last-updated-ms": 1602638573590,
+ "last-column-id": 1,
+ "current-schema-id": 0,
+ "schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id": 1,
"name": "x", "required": true, "type": "long"}]}],
+ "default-spec-id": 0,
+ "partition-specs": [{"spec-id": 0, "fields": []}],
+ "last-partition-id": 0,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {},
+ "current-snapshot-id": -1,
+ "snapshots": [],
+ "snapshot-log": [],
+ "metadata-log": [],
+ "refs": {}
+ }`
+
+ meta, err := table.ParseMetadataBytes([]byte(metadata))
+ require.NoError(t, err)
+
+ tbl := table.New([]string{"db", "orphans"}, meta, "", nil, nil)
+
+ orphanResult := table.OrphanCleanupResult{
+ OrphanFileLocations: []string{"s3://bucket/data/file1.parquet",
"s3://bucket/data/file2.parquet"},
+ TotalSizeBytes: 4096,
+ }
+
+ result := buildCleanOrphanFilesResult(tbl, orphanResult, true)
+
+ assert.True(t, result.DryRun)
+ assert.Equal(t, "db.orphans", result.Table)
+ assert.Equal(t, 2, result.OrphanFileCount)
+ assert.Equal(t, int64(4096), result.TotalSizeBytes)
+ require.Len(t, result.OrphanFiles, 2)
+ assert.Equal(t, "s3://bucket/data/file1.parquet",
result.OrphanFiles[0].Path)
+}
+
+func TestBuildCleanOrphanFilesResultDeleted(t *testing.T) {
+ const metadata = `{
+ "format-version": 2,
+ "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1",
+ "location": "s3://bucket/test/location",
+ "last-sequence-number": 0,
+ "last-updated-ms": 1602638573590,
+ "last-column-id": 1,
+ "current-schema-id": 0,
+ "schemas": [{"type": "struct", "schema-id": 0, "fields": [{"id": 1,
"name": "x", "required": true, "type": "long"}]}],
+ "default-spec-id": 0,
+ "partition-specs": [{"spec-id": 0, "fields": []}],
+ "last-partition-id": 0,
+ "default-sort-order-id": 0,
+ "sort-orders": [{"order-id": 0, "fields": []}],
+ "properties": {},
+ "current-snapshot-id": -1,
+ "snapshots": [],
+ "snapshot-log": [],
+ "metadata-log": [],
+ "refs": {}
+ }`
+
+ meta, err := table.ParseMetadataBytes([]byte(metadata))
+ require.NoError(t, err)
+
+ tbl := table.New([]string{"db", "orphans"}, meta, "", nil, nil)
+
+ orphanResult := table.OrphanCleanupResult{
+ OrphanFileLocations: []string{"s3://bucket/data/file1.parquet"},
+ DeletedFiles: []string{"s3://bucket/data/file1.parquet"},
+ TotalSizeBytes: 2048,
+ }
+
+ result := buildCleanOrphanFilesResult(tbl, orphanResult, false)
+
+ assert.False(t, result.DryRun)
+ assert.Equal(t, 1, result.OrphanFileCount)
+ assert.Equal(t, "s3://bucket/data/file1.parquet",
result.OrphanFiles[0].Path)
+}
+
+func TestTextOutputCleanOrphanFilesResultEmpty(t *testing.T) {
+ var buf bytes.Buffer
+ pterm.SetDefaultOutput(&buf)
+ pterm.DisableColor()
+
+ result := CleanOrphanFilesResult{
+ DryRun: false,
+ Table: "db.tbl",
+ OrphanFileCount: 0,
+ TotalSizeBytes: 0,
+ OrphanFiles: nil,
+ }
+
+ buf.Reset()
+ textOutput{}.CleanOrphanFilesResult(result)
+
+ assert.Contains(t, buf.String(), "No orphan files found.")
+}
+
+func TestTextOutputCleanOrphanFilesResultDryRun(t *testing.T) {
+ var buf bytes.Buffer
+ pterm.SetDefaultOutput(&buf)
+ pterm.DisableColor()
+
+ result := CleanOrphanFilesResult{
+ DryRun: true,
+ Table: "db.tbl",
+ OrphanFileCount: 2,
+ TotalSizeBytes: 1048576,
+ OrphanFiles: []OrphanFileEntry{
+ {Path: "s3://bucket/data/a.parquet"},
+ {Path: "s3://bucket/data/b.parquet"},
+ },
+ }
+
+ buf.Reset()
+ textOutput{}.CleanOrphanFilesResult(result)
+
+ output := buf.String()
+ assert.Contains(t, output, "[DRY RUN]")
+ assert.Contains(t, output, "2 orphan files found")
+ assert.Contains(t, output, "1.0 MB")
+ assert.Contains(t, output, "s3://bucket/data/a.parquet")
+}
+
+func TestJSONOutputCleanOrphanFilesResult(t *testing.T) {
+ oldStdout := os.Stdout
+ r, w, _ := os.Pipe()
+ os.Stdout = w
+ defer func() { os.Stdout = oldStdout }()
+
+ result := CleanOrphanFilesResult{
+ DryRun: true,
+ Table: "db.tbl",
+ OrphanFileCount: 1,
+ TotalSizeBytes: 512,
+ OrphanFiles: []OrphanFileEntry{
+ {Path: "s3://bucket/data/orphan.parquet"},
+ },
+ }
+
+ jsonOutput{}.CleanOrphanFilesResult(result)
+
+ w.Close()
+ var buf bytes.Buffer
+ _, _ = buf.ReadFrom(r)
+
+ output := buf.String()
+ assert.Contains(t, output, `"dry_run":true`)
+ assert.Contains(t, output, `"table":"db.tbl"`)
+ assert.Contains(t, output, `"orphan_file_count":1`)
+ assert.Contains(t, output, `"total_size_bytes":512`)
+ assert.Contains(t, output, `"path":"s3://bucket/data/orphan.parquet"`)
+}