This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 7db5a383 feat(catalog): add PurgeTable as optional interface for
physical file deletion (#1104)
7db5a383 is described below
commit 7db5a38379e49e2cf2848ee745833dd9f6c66e61
Author: Mithil Girish <[email protected]>
AuthorDate: Sat May 23 03:09:04 2026 +0530
feat(catalog): add PurgeTable as optional interface for physical file
deletion (#1104)
Fixes #1092
## Summary
Adds `PurgeableTable` as an optional interface so catalogs can support
physical deletion of table files when dropping a table.
Today, `DropTable` only removes the catalog metadata entry — physical
files (Parquet, manifests, metadata JSON) remain on storage. This PR
closes that gap for client-side catalogs without making any breaking
changes to the `Catalog.DropTable` signature.
## Changes
### Core
- `catalog/catalog.go` — add `PurgeableTable` optional interface
- `catalog/internal/utils.go` — add `PurgeTableFiles` shared helper
using `WalkDir` + `BulkRemovableIO` fallback for storage-agnostic
deletion
### Per-catalog
- **SQL**: load table metadata, drop catalog entry, walk and delete all
files under the table location via `WalkDir` + `BulkRemovableIO`
fallback
- **Glue**: same pattern; falls back to metadata-only drop if table
files
are inaccessible (e.g. S3 permissions)
- **Hive**: same pattern; falls back to metadata-only drop on load
failure
- **Hadoop**: delegates to `DropTable` which already removes the
directory tree
- **REST**: existing `PurgeTable` (`purgeRequested=true`) satisfies the
interface automatically — no changes needed
### CLI
- `cmd/iceberg/main.go` — wire `--purge` flag into `drop table` command
via type assertion against `PurgeableTable`; exits with an error if the
catalog does not support client-side purge
## Notes
- No breaking changes to `Catalog.DropTable` signature
- Client-side purge is best-effort — if interrupted between catalog drop
and file deletion, orphaned files may remain. For reliable cleanup,
prefer `expire-snapshots` + `remove-orphan-files`
- REST catalog delegates purge to the server as before
## Testing
- `catalog/sql/sql_test.go` — verifies physical file deletion on local
storage
- `catalog/glue/glue_test.go` — mock test for purge invocation and
fallback
- `catalog/hive/hive_test.go` — mock test for purge invocation and
fallback
- `cmd/iceberg/drop_test.go` — CLI flag parsing and purge dispatch tests
---------
Signed-off-by: Mithil Girish <[email protected]>
---
catalog/catalog.go | 23 +++++
catalog/glue/glue.go | 23 +++++
catalog/hadoop/hadoop.go | 17 ++++
catalog/hive/hive.go | 23 +++++
catalog/rest/rest.go | 2 +
catalog/sql/sql.go | 23 +++++
catalog/sql/sql_test.go | 160 ++++++++++++++++++++++++++++++
cmd/iceberg/args_test.go | 11 +++
cmd/iceberg/drop_test.go | 226 +++++++++++++++++++++++++++++++++++++++++++
cmd/iceberg/main.go | 18 +++-
go.mod | 2 +-
table/orphan_cleanup.go | 187 +++++++++++++++++++++++++++++++----
table/orphan_cleanup_test.go | 20 ++--
13 files changed, 703 insertions(+), 32 deletions(-)
diff --git a/catalog/catalog.go b/catalog/catalog.go
index b48f0c49..7781901b 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -167,6 +167,29 @@ type TransactionalCatalog interface {
CommitTransaction(ctx context.Context, commits []table.TableCommit)
error
}
+// PurgeableTable is an optional interface that catalogs can implement
+// to support physical table deletion (catalog entry + underlying files).
+// Callers should check for this capability via a type assertion:
+//
+// if purger, ok := cat.(catalog.PurgeableTable); ok {
+// err := purger.PurgeTable(ctx, ident)
+// }
+//
+// For REST catalogs the purge is delegated server-side. For client-side
+// catalogs (SQL, Glue, Hive, Hadoop) the table is first dropped from
+// the catalog, then all files under the table's [table.Metadata.Location]
+// root, plus any referenced files written outside the root (e.g. via
+// write.data.path or write.metadata.path table properties), are physically
deleted.
+//
+// File physical deletion is performed on a best-effort basis. If gc.enabled is
+// set to false on the table metadata, physical deletion is skipped entirely.
+// Any file-deletion, directory walk, or listing errors encountered are logged
+// as warnings, and the operation succeeds (returns nil) so that the catalog
drop
+// remains the single source of truth.
+type PurgeableTable interface {
+ PurgeTable(ctx context.Context, identifier table.Identifier) error
+}
+
func ToIdentifier(ident ...string) table.Identifier {
if len(ident) == 1 {
if ident[0] == "" {
diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go
index 0611aef3..f623730f 100644
--- a/catalog/glue/glue.go
+++ b/catalog/glue/glue.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"iter"
+ "log"
"maps"
"strconv"
"strings"
@@ -137,6 +138,8 @@ type glueAPI interface {
UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput,
optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error)
}
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
type Catalog struct {
glueSvc glueAPI
catalogId *string
@@ -405,6 +408,26 @@ func (c *Catalog) DropTable(ctx context.Context,
identifier table.Identifier) er
return nil
}
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier)
error {
+ tbl, err := c.LoadTable(ctx, identifier)
+ if err != nil {
+ return err
+ }
+
+ // Drop the table entry from the catalog first
+ err = c.DropTable(ctx, identifier)
+ if err != nil {
+ return err
+ }
+
+ // Physically delete all table files on storage best-effort
+ if purgeErr := tbl.PurgeFiles(ctx); purgeErr != nil {
+ log.Printf("WARNING: dropped table %s but failed to purge
files: %v", identifier, purgeErr)
+ }
+
+ return nil
+}
+
// RenameTable renames an Iceberg table in the Glue catalog.
func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier)
(*table.Table, error) {
fromDatabase, fromTable, err := identifierToGlueTable(from)
diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go
index be7e8c24..bb16f3d3 100644
--- a/catalog/hadoop/hadoop.go
+++ b/catalog/hadoop/hadoop.go
@@ -95,6 +95,8 @@ func validateIdentifier(ident table.Identifier) error {
// Catalog is a filesystem-based Iceberg catalog that requires no external
// metastore. All state lives on disk as directories and versioned JSON
// metadata files. Currently only local filesystem paths are supported.
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
type Catalog struct {
name string
warehouse string
@@ -561,6 +563,21 @@ func (c *Catalog) DropTable(_ context.Context, ident
table.Identifier) error {
return os.RemoveAll(tablePath)
}
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier)
error {
+ tbl, err := c.LoadTable(ctx, identifier)
+ if err != nil {
+ return err
+ }
+
+ // For Hadoop catalog, physical files walk must run BEFORE deleting the
table directory root
+ if purgeErr := tbl.PurgeFiles(ctx); purgeErr != nil {
+ log.Printf("WARNING: failing to purge some files in Hadoop
table %s: %v", identifier, purgeErr)
+ }
+
+ // Delete the table directory root from the local storage
+ return c.DropTable(ctx, identifier)
+}
+
func (c *Catalog) RenameTable(_ context.Context, _, _ table.Identifier)
(*table.Table, error) {
return nil, errors.New("hadoop catalog: rename table is not supported")
}
diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go
index cd86aebc..e88d30a2 100644
--- a/catalog/hive/hive.go
+++ b/catalog/hive/hive.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"iter"
+ "log"
"maps"
"strings"
_ "unsafe"
@@ -49,6 +50,8 @@ func init() {
}))
}
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
type Catalog struct {
client HiveClient
opts *HiveOptions
@@ -391,6 +394,26 @@ func (c *Catalog) DropTable(ctx context.Context,
identifier table.Identifier) er
return nil
}
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier)
error {
+ tbl, err := c.LoadTable(ctx, identifier)
+ if err != nil {
+ return err
+ }
+
+ // Drop the table entry from the catalog first
+ err = c.DropTable(ctx, identifier)
+ if err != nil {
+ return err
+ }
+
+ // Physically delete all table files on storage best-effort
+ if purgeErr := tbl.PurgeFiles(ctx); purgeErr != nil {
+ log.Printf("WARNING: dropped table %s but failed to purge
files: %v", identifier, purgeErr)
+ }
+
+ return nil
+}
+
func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier)
(*table.Table, error) {
fromDB, fromTable, err := identifierToTableName(from)
if err != nil {
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index 3950afb7..703a48d5 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -520,6 +520,8 @@ func toProps(o *options) iceberg.Properties {
return props
}
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
type Catalog struct {
baseURI *url.URL
cl *http.Client
diff --git a/catalog/sql/sql.go b/catalog/sql/sql.go
index 2c196b70..1c2d6b29 100644
--- a/catalog/sql/sql.go
+++ b/catalog/sql/sql.go
@@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"iter"
+ "log"
"maps"
"slices"
"strings"
@@ -169,6 +170,8 @@ func withWriteTx(ctx context.Context, db *bun.DB, fn
func(context.Context, bun.T
})
}
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
type Catalog struct {
db *bun.DB
name string
@@ -465,6 +468,26 @@ func (c *Catalog) DropTable(ctx context.Context,
identifier table.Identifier) er
})
}
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier)
error {
+ tbl, err := c.LoadTable(ctx, identifier)
+ if err != nil {
+ return err
+ }
+
+ // Drop the table entry from the catalog first
+ err = c.DropTable(ctx, identifier)
+ if err != nil {
+ return err
+ }
+
+ // Physically delete all table files on storage best-effort
+ if purgeErr := tbl.PurgeFiles(ctx); purgeErr != nil {
+ log.Printf("WARNING: dropped table %s but failed to purge
files: %v", identifier, purgeErr)
+ }
+
+ return nil
+}
+
func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier)
(*table.Table, error) {
fromNs := strings.Join(catalog.NamespaceFromIdent(from), ".")
fromTbl := catalog.TableNameFromIdent(from)
diff --git a/catalog/sql/sql_test.go b/catalog/sql/sql_test.go
index feac9308..b0b8eade 100644
--- a/catalog/sql/sql_test.go
+++ b/catalog/sql/sql_test.go
@@ -20,6 +20,7 @@ package sql_test
import (
"context"
"database/sql"
+ "encoding/json"
"fmt"
"maps"
"math/rand/v2"
@@ -29,6 +30,7 @@ import (
"strings"
"testing"
+ "github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
@@ -684,6 +686,164 @@ func (s *SqliteCatalogTestSuite) TestDropTableNotExist() {
}
}
+func (s *SqliteCatalogTestSuite) TestPurgeTable() {
+ tests := []struct {
+ cat *sqlcat.Catalog
+ tblID table.Identifier
+ }{
+ {s.getCatalogMemory(), s.randomTableIdentifier()},
+ {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+ }
+
+ for _, tt := range tests {
+ ns := catalog.NamespaceFromIdent(tt.tblID)
+
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+
+ schema := iceberg.NewSchema(1, iceberg.NestedField{
+ ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String, Required: true,
+ })
+ tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID,
schema)
+ s.Require().NoError(err)
+
+ // Append data to create data files and manifest files
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "foo", Type: arrow.BinaryTypes.String},
+ }, nil)
+
+ bldr := array.NewStringBuilder(memory.DefaultAllocator)
+ bldr.Append("bar")
+ arr := bldr.NewArray()
+
+ rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 1)
+ arrTable := array.NewTableFromRecords(arrowSchema,
[]arrow.RecordBatch{rec})
+
+ tx := tbl.NewTransaction()
+ s.Require().NoError(tx.AppendTable(context.Background(),
arrTable, 1024, nil))
+ tbl, err = tx.Commit(context.Background())
+ s.Require().NoError(err)
+
+ arr.Release()
+ bldr.Release()
+ rec.Release()
+ arrTable.Release()
+
+ metaLoc := strings.TrimPrefix(tbl.MetadataLocation(), "file://")
+ tableLoc := strings.TrimPrefix(tbl.Location(), "file://")
+ s.FileExists(metaLoc)
+
+ // Create a dummy statistics file at an external path outside
the table Location()
+ externalStatsPath := filepath.Join(s.warehouse,
"external-path", "stats.puffin")
+
s.Require().NoError(os.MkdirAll(filepath.Dir(externalStatsPath), 0o755))
+ s.Require().NoError(os.WriteFile(externalStatsPath,
[]byte("dummy puffin data"), 0o644))
+ s.FileExists(externalStatsPath)
+
+ // Add this external statistics file to the table metadata JSON
+ metaBytes, err := os.ReadFile(metaLoc)
+ s.Require().NoError(err)
+ var metaMap map[string]any
+ s.Require().NoError(json.Unmarshal(metaBytes, &metaMap))
+ metaMap["statistics"] = []any{
+ map[string]any{
+ "snapshot-id":
tbl.Metadata().CurrentSnapshot().SnapshotID,
+ "statistics-path": "file://" +
externalStatsPath,
+ "file-size-in-bytes": 17,
+ "file-footer-size-in-bytes": 10,
+ "blob-metadata": []any{},
+ },
+ }
+ newMetaBytes, err := json.Marshal(metaMap)
+ s.Require().NoError(err)
+ s.Require().NoError(os.WriteFile(metaLoc, newMetaBytes, 0o644))
+
+ // Assert that the catalog implements PurgeableTable
+ purger, ok := any(tt.cat).(catalog.PurgeableTable)
+ s.Require().True(ok, "catalog must implement PurgeableTable")
+
+ s.NoError(purger.PurgeTable(context.Background(), tt.tblID))
+
+ // The table catalog entry should be gone
+ _, err = tt.cat.LoadTable(context.Background(), tt.tblID)
+ s.ErrorIs(err, catalog.ErrNoSuchTable)
+
+ // The external file must also be purged!
+ _, statErr := os.Stat(externalStatsPath)
+ s.True(os.IsNotExist(statErr), "expected external file to be
deleted: %s", externalStatsPath)
+
+ // The physical files should be completely removed.
+ // Note: PurgeTableFiles relies on ListableIO and
BulkRemovableIO which operate on files, not directories.
+ // As a result, empty directories (like data/ and metadata/)
may be left behind on local filesystems.
+ // This is expected and acceptable since object stores (like
S3) do not have true directories.
+ walkErr := filepath.Walk(tableLoc, func(path string, info
os.FileInfo, err error) error {
+ if os.IsNotExist(err) {
+ return nil
+ }
+ s.Require().NoError(err)
+ s.True(info.IsDir(), "expected only empty directories
to remain, found file: %s", path)
+
+ return nil
+ })
+ s.Require().NoError(walkErr)
+ }
+}
+
+func (s *SqliteCatalogTestSuite) TestPurgeTableGCDisabled() {
+ tests := []struct {
+ cat *sqlcat.Catalog
+ tblID table.Identifier
+ }{
+ {s.getCatalogMemory(), s.randomTableIdentifier()},
+ {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+ }
+
+ for _, tt := range tests {
+ ns := catalog.NamespaceFromIdent(tt.tblID)
+
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+
+ schema := iceberg.NewSchema(1, iceberg.NestedField{
+ ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String, Required: true,
+ })
+ tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID,
schema, catalog.WithProperties(iceberg.Properties{"gc.enabled": "false"}))
+ s.Require().NoError(err)
+
+ // Append data to create data files and manifest files
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {Name: "foo", Type: arrow.BinaryTypes.String},
+ }, nil)
+
+ bldr := array.NewStringBuilder(memory.DefaultAllocator)
+ bldr.Append("bar")
+ arr := bldr.NewArray()
+
+ rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 1)
+ arrTable := array.NewTableFromRecords(arrowSchema,
[]arrow.RecordBatch{rec})
+
+ tx := tbl.NewTransaction()
+ s.Require().NoError(tx.AppendTable(context.Background(),
arrTable, 1024, nil))
+ tbl, err = tx.Commit(context.Background())
+ s.Require().NoError(err)
+
+ arr.Release()
+ bldr.Release()
+ rec.Release()
+ arrTable.Release()
+
+ metaLoc := strings.TrimPrefix(tbl.MetadataLocation(), "file://")
+ s.FileExists(metaLoc)
+
+ purger, ok := any(tt.cat).(catalog.PurgeableTable)
+ s.Require().True(ok, "catalog must implement PurgeableTable")
+
+ s.NoError(purger.PurgeTable(context.Background(), tt.tblID))
+
+ // The table catalog entry should be gone
+ _, err = tt.cat.LoadTable(context.Background(), tt.tblID)
+ s.ErrorIs(err, catalog.ErrNoSuchTable)
+
+ // With gc.enabled=false, metadata files are still deleted, but
data files are skipped.
+ s.NoFileExists(metaLoc)
+ }
+}
+
func (s *SqliteCatalogTestSuite) TestRenameTable() {
tests := []struct {
cat *sqlcat.Catalog
diff --git a/cmd/iceberg/args_test.go b/cmd/iceberg/args_test.go
index 4eab4cb1..75ccd1d6 100644
--- a/cmd/iceberg/args_test.go
+++ b/cmd/iceberg/args_test.go
@@ -143,6 +143,17 @@ func TestArgsParsing(t *testing.T) {
require.NotNil(t, a.Drop)
require.NotNil(t, a.Drop.Table)
assert.Equal(t, "prod.db.events",
a.Drop.Table.Identifier)
+ assert.False(t, a.Drop.Table.Purge)
+ },
+ },
+ {
+ name: "drop table with --purge",
+ args: []string{"drop", "table", "prod.db.events",
"--purge"},
+ check: func(t *testing.T, a Args) {
+ require.NotNil(t, a.Drop)
+ require.NotNil(t, a.Drop.Table)
+ assert.Equal(t, "prod.db.events",
a.Drop.Table.Identifier)
+ assert.True(t, a.Drop.Table.Purge)
},
},
{
diff --git a/cmd/iceberg/drop_test.go b/cmd/iceberg/drop_test.go
new file mode 100644
index 00000000..0c4458b1
--- /dev/null
+++ b/cmd/iceberg/drop_test.go
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+ "context"
+ "errors"
+ "iter"
+ "testing"
+
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/catalog"
+ "github.com/apache/iceberg-go/table"
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+)
+
+type mockCatalogForDrop struct {
+ catalogType catalog.Type
+ dropCalled bool
+ dropIdent table.Identifier
+ dropErr error
+ checkExists bool
+ checkExistsErr error
+}
+
+func (m *mockCatalogForDrop) CatalogType() catalog.Type {
+ return m.catalogType
+}
+
+func (m *mockCatalogForDrop) CreateTable(ctx context.Context, identifier
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt)
(*table.Table, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) CommitTable(ctx context.Context, identifier
table.Identifier, requirements []table.Requirement, updates []table.Update)
(table.Metadata, string, error) {
+ return nil, "", nil
+}
+
+func (m *mockCatalogForDrop) ListTables(ctx context.Context, namespace
table.Identifier) iter.Seq2[table.Identifier, error] {
+ return nil
+}
+
+func (m *mockCatalogForDrop) LoadTable(ctx context.Context, identifier
table.Identifier) (*table.Table, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) DropTable(ctx context.Context, identifier
table.Identifier) error {
+ m.dropCalled = true
+ m.dropIdent = identifier
+
+ return m.dropErr
+}
+
+func (m *mockCatalogForDrop) RenameTable(ctx context.Context, from, to
table.Identifier) (*table.Table, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) CheckTableExists(ctx context.Context, identifier
table.Identifier) (bool, error) {
+ return m.checkExists, m.checkExistsErr
+}
+
+func (m *mockCatalogForDrop) ListNamespaces(ctx context.Context, parent
table.Identifier) ([]table.Identifier, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) CreateNamespace(ctx context.Context, namespace
table.Identifier, props iceberg.Properties) error {
+ return nil
+}
+
+func (m *mockCatalogForDrop) DropNamespace(ctx context.Context, namespace
table.Identifier) error {
+ return nil
+}
+
+func (m *mockCatalogForDrop) CheckNamespaceExists(ctx context.Context,
namespace table.Identifier) (bool, error) {
+ return false, nil
+}
+
+func (m *mockCatalogForDrop) LoadNamespaceProperties(ctx context.Context,
namespace table.Identifier) (iceberg.Properties, error) {
+ return nil, nil
+}
+
+func (m *mockCatalogForDrop) UpdateNamespaceProperties(ctx context.Context,
namespace table.Identifier, removals []string, updates iceberg.Properties)
(catalog.PropertiesUpdateSummary, error) {
+ return catalog.PropertiesUpdateSummary{}, nil
+}
+
+type mockPurgeableCatalog struct {
+ mockCatalogForDrop
+ purgeCalled bool
+ purgeIdent table.Identifier
+ purgeErr error
+}
+
+func (m *mockPurgeableCatalog) PurgeTable(ctx context.Context, identifier
table.Identifier) error {
+ m.purgeCalled = true
+ m.purgeIdent = identifier
+
+ return m.purgeErr
+}
+
+func TestRunDropTable(t *testing.T) {
+ cat := &mockCatalogForDrop{
+ catalogType: catalog.SQL,
+ }
+
+ cmd := &DropCmd{
+ Table: &DropTableCmd{
+ Identifier: "db.events",
+ Purge: false,
+ },
+ }
+
+ var errOut errCapture
+ runDrop(context.Background(), &errOut, cat, cmd)
+
+ assert.True(t, cat.dropCalled)
+ assert.Equal(t, table.Identifier{"db", "events"}, cat.dropIdent)
+ assert.NoError(t, errOut.lastErr)
+}
+
+func TestRunDropTablePurgeNotSupported(t *testing.T) {
+ cat := &mockCatalogForDrop{
+ catalogType: catalog.SQL,
+ }
+
+ cmd := &DropCmd{
+ Table: &DropTableCmd{
+ Identifier: "db.events",
+ Purge: true,
+ },
+ }
+
+ var errOut errCapture
+ exitCode := captureExit(func() {
+ runDrop(context.Background(), &errOut, cat, cmd)
+ })
+
+ assert.Equal(t, 1, exitCode)
+ require.Error(t, errOut.lastErr)
+ assert.Contains(t, errOut.lastErr.Error(), "does not support purge")
+}
+
+func TestRunDropTablePurgeSupported(t *testing.T) {
+ cat := &mockPurgeableCatalog{
+ mockCatalogForDrop: mockCatalogForDrop{
+ catalogType: catalog.SQL,
+ },
+ }
+
+ cmd := &DropCmd{
+ Table: &DropTableCmd{
+ Identifier: "db.events",
+ Purge: true,
+ },
+ }
+
+ var errOut errCapture
+ runDrop(context.Background(), &errOut, cat, cmd)
+
+ assert.True(t, cat.purgeCalled)
+ assert.Equal(t, table.Identifier{"db", "events"}, cat.purgeIdent)
+ assert.NoError(t, errOut.lastErr)
+}
+
+func TestRunDropTableError(t *testing.T) {
+ expectedErr := errors.New("some sql error")
+ cat := &mockCatalogForDrop{
+ catalogType: catalog.SQL,
+ dropErr: expectedErr,
+ }
+
+ cmd := &DropCmd{
+ Table: &DropTableCmd{
+ Identifier: "db.events",
+ Purge: false,
+ },
+ }
+
+ var errOut errCapture
+ exitCode := captureExit(func() {
+ runDrop(context.Background(), &errOut, cat, cmd)
+ })
+
+ assert.Equal(t, 1, exitCode)
+ assert.ErrorIs(t, errOut.lastErr, expectedErr)
+}
+
+func TestRunDropTablePurgeError(t *testing.T) {
+ expectedErr := errors.New("some error")
+ cat := &mockPurgeableCatalog{
+ mockCatalogForDrop: mockCatalogForDrop{
+ catalogType: catalog.SQL,
+ },
+ purgeErr: expectedErr,
+ }
+
+ cmd := &DropCmd{
+ Table: &DropTableCmd{
+ Identifier: "db.events",
+ Purge: true,
+ },
+ }
+
+ var errOut errCapture
+ exitCode := captureExit(func() {
+ runDrop(context.Background(), &errOut, cat, cmd)
+ })
+
+ assert.Equal(t, 1, exitCode)
+ assert.True(t, cat.purgeCalled)
+ assert.ErrorIs(t, errOut.lastErr, expectedErr)
+}
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index fca148fe..a590f0c3 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -112,6 +112,7 @@ type DropNamespaceCmd struct {
type DropTableCmd struct {
Identifier string `arg:"positional,required" help:"fully qualified
table"`
+ Purge bool `arg:"--purge" help:"physically delete all table
files"`
}
type DropCmd struct {
@@ -534,13 +535,24 @@ func runDrop(ctx context.Context, output Output, cat
catalog.Catalog, cmd *DropC
err := cat.DropNamespace(ctx,
catalog.ToIdentifier(cmd.Namespace.Identifier))
if err != nil {
output.Error(err)
- os.Exit(1)
+ osExit(1)
}
case cmd.Table != nil:
- err := cat.DropTable(ctx,
catalog.ToIdentifier(cmd.Table.Identifier))
+ ident := catalog.ToIdentifier(cmd.Table.Identifier)
+ var err error
+ if cmd.Table.Purge {
+ if purger, ok := cat.(catalog.PurgeableTable); ok {
+ err = purger.PurgeTable(ctx, ident)
+ } else {
+ output.Error(fmt.Errorf("catalog %s does not
support purge", cat.CatalogType()))
+ osExit(1)
+ }
+ } else {
+ err = cat.DropTable(ctx, ident)
+ }
if err != nil {
output.Error(err)
- os.Exit(1)
+ osExit(1)
}
}
}
diff --git a/go.mod b/go.mod
index 937fc5a2..e362ecb7 100644
--- a/go.mod
+++ b/go.mod
@@ -36,6 +36,7 @@ require (
github.com/beltran/gohive v1.8.1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/docker/docker v28.5.2+incompatible
+ github.com/geoarrow/geoarrow-go v0.0.0-20260403143023-f54751c3e3a1
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
@@ -144,7 +145,6 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsevents v0.2.0 // indirect
github.com/fvbommel/sortorder v1.1.0 // indirect
- github.com/geoarrow/geoarrow-go v0.0.0-20260403143023-f54751c3e3a1 //
indirect
github.com/go-jose/go-jose/v4 v4.1.4 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
diff --git a/table/orphan_cleanup.go b/table/orphan_cleanup.go
index efc1e21a..7c945228 100644
--- a/table/orphan_cleanup.go
+++ b/table/orphan_cleanup.go
@@ -22,10 +22,13 @@ import (
"errors"
"fmt"
stdfs "io/fs"
+ "log/slog"
"net/url"
+ "os"
"path/filepath"
"reflect"
"runtime"
+ "slices"
"strings"
"sync"
"time"
@@ -192,7 +195,7 @@ func (t Table) executeOrphanCleanup(ctx context.Context,
cfg *orphanCleanupConfi
scanLocation = t.metadata.Location()
}
- referencedFiles, err := t.getReferencedFiles(fs)
+ referencedFiles, err := t.getReferencedFiles(fs, true)
if err != nil {
return OrphanCleanupResult{}, fmt.Errorf("failed to get
referenced files: %w", err)
}
@@ -228,36 +231,43 @@ func (t Table) executeOrphanCleanup(ctx context.Context,
cfg *orphanCleanupConfi
// getReferencedFiles collects all files referenced by table metadata:
previous metadata
// files, statistics and partition-statistics paths (Puffin, etc.), and all
paths reachable
// from current snapshots (manifest lists, manifests, data files).
-func (t Table) getReferencedFiles(fs iceio.IO) (map[string]bool, error) {
+//
+// If the table has snapshots, fs must not be nil, otherwise an error is
returned.
+// All returned paths are normalized using the package-level normalizeFilePath
function.
+func (t Table) getReferencedFiles(fs iceio.IO, discardDeleted bool)
(map[string]bool, error) {
referenced := make(map[string]bool)
metadata := t.metadata
for entry := range metadata.PreviousFiles() {
- referenced[entry.MetadataFile] = true
+ referenced[normalizeFilePath(entry.MetadataFile)] = false
}
- referenced[t.metadataLocation] = true
+ referenced[normalizeFilePath(t.metadataLocation)] = false
// Add version hint file (for Hadoop-style tables)
// Following Java's ReachableFileUtil.versionHintLocation() logic:
versionHintPath := filepath.Join(metadata.Location(), "metadata",
"version-hint.text")
- referenced[versionHintPath] = true
+ referenced[normalizeFilePath(versionHintPath)] = false
for sf := range metadata.Statistics() {
// Guard against malformed metadata; statistics-path is
required per spec.
if sf.StatisticsPath != "" {
- referenced[sf.StatisticsPath] = true
+ referenced[normalizeFilePath(sf.StatisticsPath)] = false
}
}
for psf := range metadata.PartitionStatistics() {
// Guard against malformed metadata; statistics-path is
required per spec.
if psf.StatisticsPath != "" {
- referenced[psf.StatisticsPath] = true
+ referenced[normalizeFilePath(psf.StatisticsPath)] =
false
}
}
+ if len(metadata.Snapshots()) > 0 && fs == nil {
+ return nil, errors.New("fs cannot be nil when table has
snapshots")
+ }
+
for _, snapshot := range metadata.Snapshots() {
if snapshot.ManifestList != "" {
- referenced[snapshot.ManifestList] = true
+ referenced[normalizeFilePath(snapshot.ManifestList)] =
false
}
manifestFiles, err := snapshot.Manifests(fs)
@@ -266,7 +276,7 @@ func (t Table) getReferencedFiles(fs iceio.IO)
(map[string]bool, error) {
}
for _, manifest := range manifestFiles {
- referenced[manifest.FilePath()] = true
+ referenced[normalizeFilePath(manifest.FilePath())] =
false
// discardDeleted=true: skip DELETED-status entries when
// computing the reachable file set. A DELETED entry is
@@ -274,11 +284,19 @@ func (t Table) getReferencedFiles(fs iceio.IO)
(map[string]bool, error) {
// against orphan cleanup once the snapshot that
// originally held it live has been expired.
// This matches iceberg-java and pyiceberg behavior.
- for entry, err := range manifest.Entries(fs, true) {
+ for entry, err := range manifest.Entries(fs,
discardDeleted) {
if err != nil {
return nil, fmt.Errorf("failed to read
manifest entries: %w", err)
}
- referenced[entry.DataFile().FilePath()] = true
+ // All files tracked within a manifest (data
files, equality deletes, position deletes)
+ // are considered "data files" for the purposes
of gc.enabled.
+
referenced[normalizeFilePath(entry.DataFile().FilePath())] = true
+ if ref :=
entry.DataFile().ReferencedDataFile(); ref != nil {
+ // This is a deletion vector entry
referencing a data file.
+ // Its FilePath() is the deletion
vector (.dv) file itself (added above).
+ // We must also mark the referenced
data file as referenced.
+ referenced[normalizeFilePath(*ref)] =
true
+ }
}
}
}
@@ -396,7 +414,7 @@ func makeFileWalkFunc(fn func(path string, info
stdfs.FileInfo) error, pathTrans
func identifyOrphanFiles(allFiles []string, referencedFiles map[string]bool,
cfg *orphanCleanupConfig) ([]string, error) {
normalizedReferencedFiles := make(map[string]string)
for refPath := range referencedFiles {
- normalizedPath := normalizeFilePath(refPath, cfg)
+ normalizedPath := normalizeFilePathWithConfig(refPath, cfg)
normalizedReferencedFiles[normalizedPath] = refPath
// Also include the original path for direct lookup
normalizedReferencedFiles[refPath] = refPath
@@ -419,7 +437,7 @@ func identifyOrphanFiles(allFiles []string, referencedFiles
map[string]bool, cfg
}
func isFileOrphan(file string, referencedFiles map[string]bool,
normalizedReferencedFiles map[string]string, cfg *orphanCleanupConfig) (bool,
error) {
- normalizedFile := normalizeFilePath(file, cfg)
+ normalizedFile := normalizeFilePathWithConfig(file, cfg)
if referencedFiles[file] || referencedFiles[normalizedFile] {
return false, nil
@@ -547,13 +565,37 @@ func deleteFilesParallel(fs iceio.IO, orphanFiles
[]string, cfg *orphanCleanupCo
// 3. Path separators and casing may vary across different systems and
configurations
// 4. Without normalization, semantically identical paths would be treated as
different,
// leading to false positives in orphan detection
-func normalizeFilePath(path string, cfg *orphanCleanupConfig) string {
+//
+// normalizeFilePath normalizes a file path for comparison, handling schemes,
authorities, and separators.
+// It also aligns file:// URIs and bare local file system paths so they
normalize to the same format.
+func normalizeFilePath(path string) string {
+ return normalizeFilePathWithConfig(path, nil)
+}
+
+func normalizeFilePathWithConfig(path string, cfg *orphanCleanupConfig) string
{
+ if strings.HasPrefix(path, "file:") {
+ if u, err := url.Parse(path); err == nil {
+ host := strings.ToLower(u.Host)
+ if host == "" || host == "localhost" {
+ pathStr := u.Path
+ // Intercept Windows drive letters (e.g., /C:/)
and strip the leading slash
+ if len(pathStr) >= 3 && pathStr[0] == '/' &&
pathStr[2] == ':' {
+ pathStr = pathStr[1:]
+ }
+
+ return filepath.Clean(pathStr)
+ }
+ // Remote authority – keep it as //host/path
+ return filepath.Clean("//" + u.Host + u.Path)
+ }
+ }
+
// Handle URL-based paths (s3://, gs://, etc.)
if strings.Contains(path, "://") {
return normalizeURLPath(path, cfg)
- } else {
- return normalizeNonURLPath(path)
}
+
+ return normalizeNonURLPath(path)
}
// normalizeURLPath normalizes URL-based file paths with scheme/authority
equivalence.
@@ -577,8 +619,15 @@ func normalizeURLPath(path string, cfg
*orphanCleanupConfig) string {
return normalizeNonURLPath(path)
}
- normalizedScheme := applySchemeEquivalence(parsedURL.Scheme,
cfg.equalSchemes)
- normalizedAuthority := applyAuthorityEquivalence(parsedURL.Host,
cfg.equalAuthorities)
+ var equalSchemes map[string]string
+ var equalAuthorities map[string]string
+ if cfg != nil {
+ equalSchemes = cfg.equalSchemes
+ equalAuthorities = cfg.equalAuthorities
+ }
+
+ normalizedScheme := applySchemeEquivalence(parsedURL.Scheme,
equalSchemes)
+ normalizedAuthority := applyAuthorityEquivalence(parsedURL.Host,
equalAuthorities)
normalizedURL := &url.URL{
Scheme: normalizedScheme,
Host: normalizedAuthority,
@@ -708,3 +757,105 @@ func checkPrefixMismatch(referencedPath, filesystemPath
string, cfg *orphanClean
return fmt.Errorf("unknown prefix mismatch mode: %d",
cfg.prefixMismatchMode)
}
}
+
+// PurgeFiles physically deletes all files under the table's warehouse location
+// and any referenced files written outside the location root (e.g., via
write.data.path
+// or write.metadata.path properties).
+//
+// It operates on a best-effort basis. Errors from individual file deletions
are
+// collected and returned together. If files cannot be deleted (e.g. due to
+// permission errors or missing paths), the errors are logged but the overall
+// catalog drop operation should typically proceed so the catalog does not
+// get out of sync with storage.
+func (t Table) PurgeFiles(ctx context.Context) error {
+ gcEnabled := t.Metadata().Properties().GetBool("gc.enabled", true)
+
+ fs, err := t.FS(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to load filesystem for table purge:
%w", err)
+ }
+
+ var errs []error
+ fileSet := make(map[string]string)
+ location := t.metadata.Location()
+
+ // 1. Walk the table location directory tree to capture all local files
+ // Only walk the directory if gc.enabled=true to prevent accidental
deletion
+ // of unreferenced branched data files.
+ if gcEnabled {
+ if listable, ok := fs.(iceio.ListableIO); ok {
+ walkErr := listable.WalkDir(location, func(path string,
d stdfs.DirEntry, err error) error {
+ if err := ctx.Err(); err != nil {
+ return err
+ }
+ if err != nil {
+ if os.IsNotExist(err) || errors.Is(err,
stdfs.ErrNotExist) {
+ return nil
+ }
+
+ return err
+ }
+ if !d.IsDir() {
+ fileSet[normalizeFilePath(path)] = path
+ }
+
+ return nil
+ })
+ if walkErr != nil && !os.IsNotExist(walkErr) &&
!errors.Is(walkErr, stdfs.ErrNotExist) {
+ errs = append(errs, fmt.Errorf("failed walking
directory %s: %w", location, walkErr))
+ }
+ }
+ }
+
+ // 2. Union in manifest-referenced and metadata files (which might be
outside the table location)
+ referencedFiles, refErr := t.getReferencedFiles(fs, false)
+ if refErr != nil {
+ return fmt.Errorf("failed to get referenced files: %w", refErr)
+ }
+
+ for path, isData := range referencedFiles {
+ if !gcEnabled && isData {
+ slog.WarnContext(ctx, "purge: skipping data file,
gc.enabled=false", "path", path)
+
+ continue
+ }
+
+ norm := normalizeFilePath(path)
+ if _, ok := fileSet[norm]; !ok {
+ fileSet[norm] = path
+ }
+ }
+
+ // Convert to slice and sort for deterministic behavior
+ files := make([]string, 0, len(fileSet))
+ for _, orig := range fileSet {
+ files = append(files, orig)
+ }
+ slices.Sort(files)
+
+ if len(files) > 0 {
+ if bulk, ok := fs.(iceio.BulkRemovableIO); ok {
+ _, bulkErr := bulk.DeleteFiles(ctx, files)
+ if bulkErr != nil {
+ errs = append(errs, fmt.Errorf("bulk deletion
failed: %w", bulkErr))
+ }
+ } else {
+ for _, file := range files {
+ if err := ctx.Err(); err != nil {
+ errs = append(errs, err)
+
+ break
+ }
+ if rmErr := fs.Remove(file); rmErr != nil &&
!os.IsNotExist(rmErr) {
+ errs = append(errs, fmt.Errorf("failed
to remove %s: %w", file, rmErr))
+ }
+ }
+ }
+ }
+
+ if len(errs) > 0 {
+ return errors.Join(errs...)
+ }
+
+ return nil
+}
diff --git a/table/orphan_cleanup_test.go b/table/orphan_cleanup_test.go
index 43b01266..e77f90db 100644
--- a/table/orphan_cleanup_test.go
+++ b/table/orphan_cleanup_test.go
@@ -117,7 +117,7 @@ func TestNormalizeFilePath(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- result := normalizeFilePath(tt.input, cfg)
+ result := normalizeFilePathWithConfig(tt.input, cfg)
assert.Equal(t, tt.expected, result)
})
}
@@ -365,7 +365,7 @@ func TestIsFileOrphan(t *testing.T) {
}
normalizedReferencedFiles := make(map[string]string)
for refPath := range referencedFiles {
- normalizedPath := normalizeFilePath(refPath, cfg)
+ normalizedPath := normalizeFilePathWithConfig(refPath, cfg)
normalizedReferencedFiles[normalizedPath] = refPath
}
@@ -529,13 +529,13 @@ func TestGetReferencedFiles_IncludesStatisticsFiles(t
*testing.T) {
}
// No snapshots: FileIO is not used; statistics paths must still be
referenced.
- refs, err := tbl.getReferencedFiles(nil)
+ refs, err := tbl.getReferencedFiles(nil, true)
require.NoError(t, err)
- assert.True(t, refs["s3://bucket/stats/table-stats.puffin"])
- assert.True(t, refs["s3://bucket/stats/part-stats.puffin"])
- assert.True(t, refs[tbl.metadataLocation])
- assert.False(t, refs["s3://bucket/stats/not-referenced.puffin"])
+ assert.Contains(t, refs,
normalizeFilePath("s3://bucket/stats/table-stats.puffin"))
+ assert.Contains(t, refs,
normalizeFilePath("s3://bucket/stats/part-stats.puffin"))
+ assert.Contains(t, refs, normalizeFilePath(tbl.metadataLocation))
+ assert.False(t,
refs[normalizeFilePath("s3://bucket/stats/not-referenced.puffin")])
assert.False(t, refs[""])
}
@@ -716,12 +716,12 @@ func
TestGetReferencedFiles_OverwriteThenExpireExcludesTombstones(t *testing.T)
// fileA is now referenced only via a DELETED entry in the surviving
// snapshot's tombstone manifest. The fix must exclude it.
- refs, err := tbl.getReferencedFiles(fs)
+ refs, err := tbl.getReferencedFiles(fs, true)
require.NoError(t, err)
- assert.True(t, refs[fileB],
+ assert.Contains(t, refs, normalizeFilePath(fileB),
"new live file (ADDED in surviving snapshot) must be in
reference set")
- assert.False(t, refs[fileA],
+ assert.NotContains(t, refs, normalizeFilePath(fileA),
"overwritten file (only present as DELETED tombstone) must NOT
be in reference set")
}