This is an automated email from the ASF dual-hosted git repository.

laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 7db5a383 feat(catalog): add PurgeTable as optional interface for 
physical file deletion (#1104)
7db5a383 is described below

commit 7db5a38379e49e2cf2848ee745833dd9f6c66e61
Author: Mithil Girish <[email protected]>
AuthorDate: Sat May 23 03:09:04 2026 +0530

    feat(catalog): add PurgeTable as optional interface for physical file 
deletion (#1104)
    
    Fixes #1092
    
    ## Summary
    Adds `PurgeableTable` as an optional interface so catalogs can support
    physical deletion of table files when dropping a table.
    
    Today, `DropTable` only removes the catalog metadata entry — physical
    files (Parquet, manifests, metadata JSON) remain on storage. This PR
    closes that gap for client-side catalogs without making any breaking
    changes to the `Catalog.DropTable` signature.
    
    ## Changes
    
    ### Core
    - `catalog/catalog.go` — add `PurgeableTable` optional interface
    - `catalog/internal/utils.go` — add `PurgeTableFiles` shared helper
    using `WalkDir` + `BulkRemovableIO` fallback for storage-agnostic
    deletion
    
    ### Per-catalog
    - **SQL**: load table metadata, drop catalog entry, walk and delete all
    files under the table location via `WalkDir` + `BulkRemovableIO`
    fallback
    - **Glue**: same pattern; falls back to metadata-only drop if table
    files
      are inaccessible (e.g. S3 permissions)
    - **Hive**: same pattern; falls back to metadata-only drop on load
    failure
    - **Hadoop**: delegates to `DropTable` which already removes the
    directory tree
    - **REST**: existing `PurgeTable` (`purgeRequested=true`) satisfies the
      interface automatically — no changes needed
    
    ### CLI
    - `cmd/iceberg/main.go` — wire `--purge` flag into `drop table` command
    via type assertion against `PurgeableTable`; exits with an error if the
      catalog does not support client-side purge
    
    ## Notes
    - No breaking changes to `Catalog.DropTable` signature
    - Client-side purge is best-effort — if interrupted between catalog drop
      and file deletion, orphaned files may remain. For reliable cleanup,
      prefer `expire-snapshots` + `remove-orphan-files`
    - REST catalog delegates purge to the server as before
    
    ## Testing
    - `catalog/sql/sql_test.go` — verifies physical file deletion on local
    storage
    - `catalog/glue/glue_test.go` — mock test for purge invocation and
    fallback
    - `catalog/hive/hive_test.go` — mock test for purge invocation and
    fallback
    - `cmd/iceberg/drop_test.go` — CLI flag parsing and purge dispatch tests
    
    ---------
    
    Signed-off-by: Mithil Girish <[email protected]>
---
 catalog/catalog.go           |  23 +++++
 catalog/glue/glue.go         |  23 +++++
 catalog/hadoop/hadoop.go     |  17 ++++
 catalog/hive/hive.go         |  23 +++++
 catalog/rest/rest.go         |   2 +
 catalog/sql/sql.go           |  23 +++++
 catalog/sql/sql_test.go      | 160 ++++++++++++++++++++++++++++++
 cmd/iceberg/args_test.go     |  11 +++
 cmd/iceberg/drop_test.go     | 226 +++++++++++++++++++++++++++++++++++++++++++
 cmd/iceberg/main.go          |  18 +++-
 go.mod                       |   2 +-
 table/orphan_cleanup.go      | 187 +++++++++++++++++++++++++++++++----
 table/orphan_cleanup_test.go |  20 ++--
 13 files changed, 703 insertions(+), 32 deletions(-)

diff --git a/catalog/catalog.go b/catalog/catalog.go
index b48f0c49..7781901b 100644
--- a/catalog/catalog.go
+++ b/catalog/catalog.go
@@ -167,6 +167,29 @@ type TransactionalCatalog interface {
        CommitTransaction(ctx context.Context, commits []table.TableCommit) 
error
 }
 
+// PurgeableTable is an optional interface that catalogs can implement
+// to support physical table deletion (catalog entry + underlying files).
+// Callers should check for this capability via a type assertion:
+//
+//     if purger, ok := cat.(catalog.PurgeableTable); ok {
+//         err := purger.PurgeTable(ctx, ident)
+//     }
+//
+// For REST catalogs the purge is delegated server-side. For client-side
+// catalogs (SQL, Glue, Hive, Hadoop) the table is first dropped from
+// the catalog, then all files under the table's [table.Metadata.Location]
+// root, plus any referenced files written outside the root (e.g. via
+// write.data.path or write.metadata.path table properties), are physically 
deleted.
+//
+// File physical deletion is performed on a best-effort basis. If gc.enabled is
+// set to false on the table metadata, physical deletion is skipped entirely.
+// Any file-deletion, directory walk, or listing errors encountered are logged
+// as warnings, and the operation succeeds (returns nil) so that the catalog 
drop
+// remains the single source of truth.
+type PurgeableTable interface {
+       PurgeTable(ctx context.Context, identifier table.Identifier) error
+}
+
 func ToIdentifier(ident ...string) table.Identifier {
        if len(ident) == 1 {
                if ident[0] == "" {
diff --git a/catalog/glue/glue.go b/catalog/glue/glue.go
index 0611aef3..f623730f 100644
--- a/catalog/glue/glue.go
+++ b/catalog/glue/glue.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "iter"
+       "log"
        "maps"
        "strconv"
        "strings"
@@ -137,6 +138,8 @@ type glueAPI interface {
        UpdateDatabase(ctx context.Context, params *glue.UpdateDatabaseInput, 
optFns ...func(*glue.Options)) (*glue.UpdateDatabaseOutput, error)
 }
 
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
 type Catalog struct {
        glueSvc   glueAPI
        catalogId *string
@@ -405,6 +408,26 @@ func (c *Catalog) DropTable(ctx context.Context, 
identifier table.Identifier) er
        return nil
 }
 
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier) 
error {
+       tbl, err := c.LoadTable(ctx, identifier)
+       if err != nil {
+               return err
+       }
+
+       // Drop the table entry from the catalog first
+       err = c.DropTable(ctx, identifier)
+       if err != nil {
+               return err
+       }
+
+       // Physically delete all table files on storage best-effort
+       if purgeErr := tbl.PurgeFiles(ctx); purgeErr != nil {
+               log.Printf("WARNING: dropped table %s but failed to purge 
files: %v", identifier, purgeErr)
+       }
+
+       return nil
+}
+
 // RenameTable renames an Iceberg table in the Glue catalog.
 func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) 
(*table.Table, error) {
        fromDatabase, fromTable, err := identifierToGlueTable(from)
diff --git a/catalog/hadoop/hadoop.go b/catalog/hadoop/hadoop.go
index be7e8c24..bb16f3d3 100644
--- a/catalog/hadoop/hadoop.go
+++ b/catalog/hadoop/hadoop.go
@@ -95,6 +95,8 @@ func validateIdentifier(ident table.Identifier) error {
 // Catalog is a filesystem-based Iceberg catalog that requires no external
 // metastore. All state lives on disk as directories and versioned JSON
 // metadata files. Currently only local filesystem paths are supported.
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
 type Catalog struct {
        name      string
        warehouse string
@@ -561,6 +563,21 @@ func (c *Catalog) DropTable(_ context.Context, ident 
table.Identifier) error {
        return os.RemoveAll(tablePath)
 }
 
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier) 
error {
+       tbl, err := c.LoadTable(ctx, identifier)
+       if err != nil {
+               return err
+       }
+
+       // For Hadoop catalog, physical files walk must run BEFORE deleting the 
table directory root
+       if purgeErr := tbl.PurgeFiles(ctx); purgeErr != nil {
+               log.Printf("WARNING: failing to purge some files in Hadoop 
table %s: %v", identifier, purgeErr)
+       }
+
+       // Delete the table directory root from the local storage
+       return c.DropTable(ctx, identifier)
+}
+
 func (c *Catalog) RenameTable(_ context.Context, _, _ table.Identifier) 
(*table.Table, error) {
        return nil, errors.New("hadoop catalog: rename table is not supported")
 }
diff --git a/catalog/hive/hive.go b/catalog/hive/hive.go
index cd86aebc..e88d30a2 100644
--- a/catalog/hive/hive.go
+++ b/catalog/hive/hive.go
@@ -22,6 +22,7 @@ import (
        "errors"
        "fmt"
        "iter"
+       "log"
        "maps"
        "strings"
        _ "unsafe"
@@ -49,6 +50,8 @@ func init() {
        }))
 }
 
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
 type Catalog struct {
        client HiveClient
        opts   *HiveOptions
@@ -391,6 +394,26 @@ func (c *Catalog) DropTable(ctx context.Context, 
identifier table.Identifier) er
        return nil
 }
 
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier) 
error {
+       tbl, err := c.LoadTable(ctx, identifier)
+       if err != nil {
+               return err
+       }
+
+       // Drop the table entry from the catalog first
+       err = c.DropTable(ctx, identifier)
+       if err != nil {
+               return err
+       }
+
+       // Physically delete all table files on storage best-effort
+       if purgeErr := tbl.PurgeFiles(ctx); purgeErr != nil {
+               log.Printf("WARNING: dropped table %s but failed to purge 
files: %v", identifier, purgeErr)
+       }
+
+       return nil
+}
+
 func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) 
(*table.Table, error) {
        fromDB, fromTable, err := identifierToTableName(from)
        if err != nil {
diff --git a/catalog/rest/rest.go b/catalog/rest/rest.go
index 3950afb7..703a48d5 100644
--- a/catalog/rest/rest.go
+++ b/catalog/rest/rest.go
@@ -520,6 +520,8 @@ func toProps(o *options) iceberg.Properties {
        return props
 }
 
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
 type Catalog struct {
        baseURI *url.URL
        cl      *http.Client
diff --git a/catalog/sql/sql.go b/catalog/sql/sql.go
index 2c196b70..1c2d6b29 100644
--- a/catalog/sql/sql.go
+++ b/catalog/sql/sql.go
@@ -23,6 +23,7 @@ import (
        "errors"
        "fmt"
        "iter"
+       "log"
        "maps"
        "slices"
        "strings"
@@ -169,6 +170,8 @@ func withWriteTx(ctx context.Context, db *bun.DB, fn 
func(context.Context, bun.T
        })
 }
 
+var _ catalog.PurgeableTable = (*Catalog)(nil)
+
 type Catalog struct {
        db    *bun.DB
        name  string
@@ -465,6 +468,26 @@ func (c *Catalog) DropTable(ctx context.Context, 
identifier table.Identifier) er
        })
 }
 
+func (c *Catalog) PurgeTable(ctx context.Context, identifier table.Identifier) 
error {
+       tbl, err := c.LoadTable(ctx, identifier)
+       if err != nil {
+               return err
+       }
+
+       // Drop the table entry from the catalog first
+       err = c.DropTable(ctx, identifier)
+       if err != nil {
+               return err
+       }
+
+       // Physically delete all table files on storage best-effort
+       if purgeErr := tbl.PurgeFiles(ctx); purgeErr != nil {
+               log.Printf("WARNING: dropped table %s but failed to purge 
files: %v", identifier, purgeErr)
+       }
+
+       return nil
+}
+
 func (c *Catalog) RenameTable(ctx context.Context, from, to table.Identifier) 
(*table.Table, error) {
        fromNs := strings.Join(catalog.NamespaceFromIdent(from), ".")
        fromTbl := catalog.TableNameFromIdent(from)
diff --git a/catalog/sql/sql_test.go b/catalog/sql/sql_test.go
index feac9308..b0b8eade 100644
--- a/catalog/sql/sql_test.go
+++ b/catalog/sql/sql_test.go
@@ -20,6 +20,7 @@ package sql_test
 import (
        "context"
        "database/sql"
+       "encoding/json"
        "fmt"
        "maps"
        "math/rand/v2"
@@ -29,6 +30,7 @@ import (
        "strings"
        "testing"
 
+       "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/apache/arrow-go/v18/parquet/pqarrow"
@@ -684,6 +686,164 @@ func (s *SqliteCatalogTestSuite) TestDropTableNotExist() {
        }
 }
 
+func (s *SqliteCatalogTestSuite) TestPurgeTable() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+
+               schema := iceberg.NewSchema(1, iceberg.NestedField{
+                       ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String, Required: true,
+               })
+               tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID, 
schema)
+               s.Require().NoError(err)
+
+               // Append data to create data files and manifest files
+               arrowSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "foo", Type: arrow.BinaryTypes.String},
+               }, nil)
+
+               bldr := array.NewStringBuilder(memory.DefaultAllocator)
+               bldr.Append("bar")
+               arr := bldr.NewArray()
+
+               rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 1)
+               arrTable := array.NewTableFromRecords(arrowSchema, 
[]arrow.RecordBatch{rec})
+
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AppendTable(context.Background(), 
arrTable, 1024, nil))
+               tbl, err = tx.Commit(context.Background())
+               s.Require().NoError(err)
+
+               arr.Release()
+               bldr.Release()
+               rec.Release()
+               arrTable.Release()
+
+               metaLoc := strings.TrimPrefix(tbl.MetadataLocation(), "file://")
+               tableLoc := strings.TrimPrefix(tbl.Location(), "file://")
+               s.FileExists(metaLoc)
+
+               // Create a dummy statistics file at an external path outside 
the table Location()
+               externalStatsPath := filepath.Join(s.warehouse, 
"external-path", "stats.puffin")
+               
s.Require().NoError(os.MkdirAll(filepath.Dir(externalStatsPath), 0o755))
+               s.Require().NoError(os.WriteFile(externalStatsPath, 
[]byte("dummy puffin data"), 0o644))
+               s.FileExists(externalStatsPath)
+
+               // Add this external statistics file to the table metadata JSON
+               metaBytes, err := os.ReadFile(metaLoc)
+               s.Require().NoError(err)
+               var metaMap map[string]any
+               s.Require().NoError(json.Unmarshal(metaBytes, &metaMap))
+               metaMap["statistics"] = []any{
+                       map[string]any{
+                               "snapshot-id":               
tbl.Metadata().CurrentSnapshot().SnapshotID,
+                               "statistics-path":           "file://" + 
externalStatsPath,
+                               "file-size-in-bytes":        17,
+                               "file-footer-size-in-bytes": 10,
+                               "blob-metadata":             []any{},
+                       },
+               }
+               newMetaBytes, err := json.Marshal(metaMap)
+               s.Require().NoError(err)
+               s.Require().NoError(os.WriteFile(metaLoc, newMetaBytes, 0o644))
+
+               // Assert that the catalog implements PurgeableTable
+               purger, ok := any(tt.cat).(catalog.PurgeableTable)
+               s.Require().True(ok, "catalog must implement PurgeableTable")
+
+               s.NoError(purger.PurgeTable(context.Background(), tt.tblID))
+
+               // The table catalog entry should be gone
+               _, err = tt.cat.LoadTable(context.Background(), tt.tblID)
+               s.ErrorIs(err, catalog.ErrNoSuchTable)
+
+               // The external file must also be purged!
+               _, statErr := os.Stat(externalStatsPath)
+               s.True(os.IsNotExist(statErr), "expected external file to be 
deleted: %s", externalStatsPath)
+
+               // The physical files should be completely removed.
+               // Note: PurgeTableFiles relies on ListableIO and 
BulkRemovableIO which operate on files, not directories.
+               // As a result, empty directories (like data/ and metadata/) 
may be left behind on local filesystems.
+               // This is expected and acceptable since object stores (like 
S3) do not have true directories.
+               walkErr := filepath.Walk(tableLoc, func(path string, info 
os.FileInfo, err error) error {
+                       if os.IsNotExist(err) {
+                               return nil
+                       }
+                       s.Require().NoError(err)
+                       s.True(info.IsDir(), "expected only empty directories 
to remain, found file: %s", path)
+
+                       return nil
+               })
+               s.Require().NoError(walkErr)
+       }
+}
+
+func (s *SqliteCatalogTestSuite) TestPurgeTableGCDisabled() {
+       tests := []struct {
+               cat   *sqlcat.Catalog
+               tblID table.Identifier
+       }{
+               {s.getCatalogMemory(), s.randomTableIdentifier()},
+               {s.getCatalogSqlite(), s.randomHierarchicalIdentifier()},
+       }
+
+       for _, tt := range tests {
+               ns := catalog.NamespaceFromIdent(tt.tblID)
+               
s.Require().NoError(tt.cat.CreateNamespace(context.Background(), ns, nil))
+
+               schema := iceberg.NewSchema(1, iceberg.NestedField{
+                       ID: 1, Name: "foo", Type: 
iceberg.PrimitiveTypes.String, Required: true,
+               })
+               tbl, err := tt.cat.CreateTable(context.Background(), tt.tblID, 
schema, catalog.WithProperties(iceberg.Properties{"gc.enabled": "false"}))
+               s.Require().NoError(err)
+
+               // Append data to create data files and manifest files
+               arrowSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "foo", Type: arrow.BinaryTypes.String},
+               }, nil)
+
+               bldr := array.NewStringBuilder(memory.DefaultAllocator)
+               bldr.Append("bar")
+               arr := bldr.NewArray()
+
+               rec := array.NewRecordBatch(arrowSchema, []arrow.Array{arr}, 1)
+               arrTable := array.NewTableFromRecords(arrowSchema, 
[]arrow.RecordBatch{rec})
+
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AppendTable(context.Background(), 
arrTable, 1024, nil))
+               tbl, err = tx.Commit(context.Background())
+               s.Require().NoError(err)
+
+               arr.Release()
+               bldr.Release()
+               rec.Release()
+               arrTable.Release()
+
+               metaLoc := strings.TrimPrefix(tbl.MetadataLocation(), "file://")
+               s.FileExists(metaLoc)
+
+               purger, ok := any(tt.cat).(catalog.PurgeableTable)
+               s.Require().True(ok, "catalog must implement PurgeableTable")
+
+               s.NoError(purger.PurgeTable(context.Background(), tt.tblID))
+
+               // The table catalog entry should be gone
+               _, err = tt.cat.LoadTable(context.Background(), tt.tblID)
+               s.ErrorIs(err, catalog.ErrNoSuchTable)
+
+               // With gc.enabled=false, metadata files are still deleted, but 
data files are skipped.
+               s.NoFileExists(metaLoc)
+       }
+}
+
 func (s *SqliteCatalogTestSuite) TestRenameTable() {
        tests := []struct {
                cat       *sqlcat.Catalog
diff --git a/cmd/iceberg/args_test.go b/cmd/iceberg/args_test.go
index 4eab4cb1..75ccd1d6 100644
--- a/cmd/iceberg/args_test.go
+++ b/cmd/iceberg/args_test.go
@@ -143,6 +143,17 @@ func TestArgsParsing(t *testing.T) {
                                require.NotNil(t, a.Drop)
                                require.NotNil(t, a.Drop.Table)
                                assert.Equal(t, "prod.db.events", 
a.Drop.Table.Identifier)
+                               assert.False(t, a.Drop.Table.Purge)
+                       },
+               },
+               {
+                       name: "drop table with --purge",
+                       args: []string{"drop", "table", "prod.db.events", 
"--purge"},
+                       check: func(t *testing.T, a Args) {
+                               require.NotNil(t, a.Drop)
+                               require.NotNil(t, a.Drop.Table)
+                               assert.Equal(t, "prod.db.events", 
a.Drop.Table.Identifier)
+                               assert.True(t, a.Drop.Table.Purge)
                        },
                },
                {
diff --git a/cmd/iceberg/drop_test.go b/cmd/iceberg/drop_test.go
new file mode 100644
index 00000000..0c4458b1
--- /dev/null
+++ b/cmd/iceberg/drop_test.go
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package main
+
+import (
+       "context"
+       "errors"
+       "iter"
+       "testing"
+
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/catalog"
+       "github.com/apache/iceberg-go/table"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+)
+
+type mockCatalogForDrop struct {
+       catalogType    catalog.Type
+       dropCalled     bool
+       dropIdent      table.Identifier
+       dropErr        error
+       checkExists    bool
+       checkExistsErr error
+}
+
+func (m *mockCatalogForDrop) CatalogType() catalog.Type {
+       return m.catalogType
+}
+
+func (m *mockCatalogForDrop) CreateTable(ctx context.Context, identifier 
table.Identifier, schema *iceberg.Schema, opts ...catalog.CreateTableOpt) 
(*table.Table, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) CommitTable(ctx context.Context, identifier 
table.Identifier, requirements []table.Requirement, updates []table.Update) 
(table.Metadata, string, error) {
+       return nil, "", nil
+}
+
+func (m *mockCatalogForDrop) ListTables(ctx context.Context, namespace 
table.Identifier) iter.Seq2[table.Identifier, error] {
+       return nil
+}
+
+func (m *mockCatalogForDrop) LoadTable(ctx context.Context, identifier 
table.Identifier) (*table.Table, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) DropTable(ctx context.Context, identifier 
table.Identifier) error {
+       m.dropCalled = true
+       m.dropIdent = identifier
+
+       return m.dropErr
+}
+
+func (m *mockCatalogForDrop) RenameTable(ctx context.Context, from, to 
table.Identifier) (*table.Table, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) CheckTableExists(ctx context.Context, identifier 
table.Identifier) (bool, error) {
+       return m.checkExists, m.checkExistsErr
+}
+
+func (m *mockCatalogForDrop) ListNamespaces(ctx context.Context, parent 
table.Identifier) ([]table.Identifier, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) CreateNamespace(ctx context.Context, namespace 
table.Identifier, props iceberg.Properties) error {
+       return nil
+}
+
+func (m *mockCatalogForDrop) DropNamespace(ctx context.Context, namespace 
table.Identifier) error {
+       return nil
+}
+
+func (m *mockCatalogForDrop) CheckNamespaceExists(ctx context.Context, 
namespace table.Identifier) (bool, error) {
+       return false, nil
+}
+
+func (m *mockCatalogForDrop) LoadNamespaceProperties(ctx context.Context, 
namespace table.Identifier) (iceberg.Properties, error) {
+       return nil, nil
+}
+
+func (m *mockCatalogForDrop) UpdateNamespaceProperties(ctx context.Context, 
namespace table.Identifier, removals []string, updates iceberg.Properties) 
(catalog.PropertiesUpdateSummary, error) {
+       return catalog.PropertiesUpdateSummary{}, nil
+}
+
+type mockPurgeableCatalog struct {
+       mockCatalogForDrop
+       purgeCalled bool
+       purgeIdent  table.Identifier
+       purgeErr    error
+}
+
+func (m *mockPurgeableCatalog) PurgeTable(ctx context.Context, identifier 
table.Identifier) error {
+       m.purgeCalled = true
+       m.purgeIdent = identifier
+
+       return m.purgeErr
+}
+
+func TestRunDropTable(t *testing.T) {
+       cat := &mockCatalogForDrop{
+               catalogType: catalog.SQL,
+       }
+
+       cmd := &DropCmd{
+               Table: &DropTableCmd{
+                       Identifier: "db.events",
+                       Purge:      false,
+               },
+       }
+
+       var errOut errCapture
+       runDrop(context.Background(), &errOut, cat, cmd)
+
+       assert.True(t, cat.dropCalled)
+       assert.Equal(t, table.Identifier{"db", "events"}, cat.dropIdent)
+       assert.NoError(t, errOut.lastErr)
+}
+
+func TestRunDropTablePurgeNotSupported(t *testing.T) {
+       cat := &mockCatalogForDrop{
+               catalogType: catalog.SQL,
+       }
+
+       cmd := &DropCmd{
+               Table: &DropTableCmd{
+                       Identifier: "db.events",
+                       Purge:      true,
+               },
+       }
+
+       var errOut errCapture
+       exitCode := captureExit(func() {
+               runDrop(context.Background(), &errOut, cat, cmd)
+       })
+
+       assert.Equal(t, 1, exitCode)
+       require.Error(t, errOut.lastErr)
+       assert.Contains(t, errOut.lastErr.Error(), "does not support purge")
+}
+
+func TestRunDropTablePurgeSupported(t *testing.T) {
+       cat := &mockPurgeableCatalog{
+               mockCatalogForDrop: mockCatalogForDrop{
+                       catalogType: catalog.SQL,
+               },
+       }
+
+       cmd := &DropCmd{
+               Table: &DropTableCmd{
+                       Identifier: "db.events",
+                       Purge:      true,
+               },
+       }
+
+       var errOut errCapture
+       runDrop(context.Background(), &errOut, cat, cmd)
+
+       assert.True(t, cat.purgeCalled)
+       assert.Equal(t, table.Identifier{"db", "events"}, cat.purgeIdent)
+       assert.NoError(t, errOut.lastErr)
+}
+
+func TestRunDropTableError(t *testing.T) {
+       expectedErr := errors.New("some sql error")
+       cat := &mockCatalogForDrop{
+               catalogType: catalog.SQL,
+               dropErr:     expectedErr,
+       }
+
+       cmd := &DropCmd{
+               Table: &DropTableCmd{
+                       Identifier: "db.events",
+                       Purge:      false,
+               },
+       }
+
+       var errOut errCapture
+       exitCode := captureExit(func() {
+               runDrop(context.Background(), &errOut, cat, cmd)
+       })
+
+       assert.Equal(t, 1, exitCode)
+       assert.ErrorIs(t, errOut.lastErr, expectedErr)
+}
+
+func TestRunDropTablePurgeError(t *testing.T) {
+       expectedErr := errors.New("some error")
+       cat := &mockPurgeableCatalog{
+               mockCatalogForDrop: mockCatalogForDrop{
+                       catalogType: catalog.SQL,
+               },
+               purgeErr: expectedErr,
+       }
+
+       cmd := &DropCmd{
+               Table: &DropTableCmd{
+                       Identifier: "db.events",
+                       Purge:      true,
+               },
+       }
+
+       var errOut errCapture
+       exitCode := captureExit(func() {
+               runDrop(context.Background(), &errOut, cat, cmd)
+       })
+
+       assert.Equal(t, 1, exitCode)
+       assert.True(t, cat.purgeCalled)
+       assert.ErrorIs(t, errOut.lastErr, expectedErr)
+}
diff --git a/cmd/iceberg/main.go b/cmd/iceberg/main.go
index fca148fe..a590f0c3 100644
--- a/cmd/iceberg/main.go
+++ b/cmd/iceberg/main.go
@@ -112,6 +112,7 @@ type DropNamespaceCmd struct {
 
 type DropTableCmd struct {
        Identifier string `arg:"positional,required" help:"fully qualified 
table"`
+       Purge      bool   `arg:"--purge" help:"physically delete all table 
files"`
 }
 
 type DropCmd struct {
@@ -534,13 +535,24 @@ func runDrop(ctx context.Context, output Output, cat 
catalog.Catalog, cmd *DropC
                err := cat.DropNamespace(ctx, 
catalog.ToIdentifier(cmd.Namespace.Identifier))
                if err != nil {
                        output.Error(err)
-                       os.Exit(1)
+                       osExit(1)
                }
        case cmd.Table != nil:
-               err := cat.DropTable(ctx, 
catalog.ToIdentifier(cmd.Table.Identifier))
+               ident := catalog.ToIdentifier(cmd.Table.Identifier)
+               var err error
+               if cmd.Table.Purge {
+                       if purger, ok := cat.(catalog.PurgeableTable); ok {
+                               err = purger.PurgeTable(ctx, ident)
+                       } else {
+                               output.Error(fmt.Errorf("catalog %s does not 
support purge", cat.CatalogType()))
+                               osExit(1)
+                       }
+               } else {
+                       err = cat.DropTable(ctx, ident)
+               }
                if err != nil {
                        output.Error(err)
-                       os.Exit(1)
+                       osExit(1)
                }
        }
 }
diff --git a/go.mod b/go.mod
index 937fc5a2..e362ecb7 100644
--- a/go.mod
+++ b/go.mod
@@ -36,6 +36,7 @@ require (
        github.com/beltran/gohive v1.8.1
        github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
        github.com/docker/docker v28.5.2+incompatible
+       github.com/geoarrow/geoarrow-go v0.0.0-20260403143023-f54751c3e3a1
        github.com/google/go-cmp v0.7.0
        github.com/google/uuid v1.6.0
        github.com/hashicorp/golang-lru/v2 v2.0.7
@@ -144,7 +145,6 @@ require (
        github.com/felixge/httpsnoop v1.0.4 // indirect
        github.com/fsnotify/fsevents v0.2.0 // indirect
        github.com/fvbommel/sortorder v1.1.0 // indirect
-       github.com/geoarrow/geoarrow-go v0.0.0-20260403143023-f54751c3e3a1 // 
indirect
        github.com/go-jose/go-jose/v4 v4.1.4 // indirect
        github.com/go-logr/logr v1.4.3 // indirect
        github.com/go-logr/stdr v1.2.2 // indirect
diff --git a/table/orphan_cleanup.go b/table/orphan_cleanup.go
index efc1e21a..7c945228 100644
--- a/table/orphan_cleanup.go
+++ b/table/orphan_cleanup.go
@@ -22,10 +22,13 @@ import (
        "errors"
        "fmt"
        stdfs "io/fs"
+       "log/slog"
        "net/url"
+       "os"
        "path/filepath"
        "reflect"
        "runtime"
+       "slices"
        "strings"
        "sync"
        "time"
@@ -192,7 +195,7 @@ func (t Table) executeOrphanCleanup(ctx context.Context, 
cfg *orphanCleanupConfi
                scanLocation = t.metadata.Location()
        }
 
-       referencedFiles, err := t.getReferencedFiles(fs)
+       referencedFiles, err := t.getReferencedFiles(fs, true)
        if err != nil {
                return OrphanCleanupResult{}, fmt.Errorf("failed to get 
referenced files: %w", err)
        }
@@ -228,36 +231,43 @@ func (t Table) executeOrphanCleanup(ctx context.Context, 
cfg *orphanCleanupConfi
 // getReferencedFiles collects all files referenced by table metadata: 
previous metadata
 // files, statistics and partition-statistics paths (Puffin, etc.), and all 
paths reachable
 // from current snapshots (manifest lists, manifests, data files).
-func (t Table) getReferencedFiles(fs iceio.IO) (map[string]bool, error) {
+//
+// If the table has snapshots, fs must not be nil, otherwise an error is 
returned.
+// All returned paths are normalized using the package-level normalizeFilePath 
function.
+func (t Table) getReferencedFiles(fs iceio.IO, discardDeleted bool) 
(map[string]bool, error) {
        referenced := make(map[string]bool)
        metadata := t.metadata
 
        for entry := range metadata.PreviousFiles() {
-               referenced[entry.MetadataFile] = true
+               referenced[normalizeFilePath(entry.MetadataFile)] = false
        }
-       referenced[t.metadataLocation] = true
+       referenced[normalizeFilePath(t.metadataLocation)] = false
 
        // Add version hint file (for Hadoop-style tables)
        // Following Java's ReachableFileUtil.versionHintLocation() logic:
        versionHintPath := filepath.Join(metadata.Location(), "metadata", 
"version-hint.text")
-       referenced[versionHintPath] = true
+       referenced[normalizeFilePath(versionHintPath)] = false
 
        for sf := range metadata.Statistics() {
                // Guard against malformed metadata; statistics-path is 
required per spec.
                if sf.StatisticsPath != "" {
-                       referenced[sf.StatisticsPath] = true
+                       referenced[normalizeFilePath(sf.StatisticsPath)] = false
                }
        }
        for psf := range metadata.PartitionStatistics() {
                // Guard against malformed metadata; statistics-path is 
required per spec.
                if psf.StatisticsPath != "" {
-                       referenced[psf.StatisticsPath] = true
+                       referenced[normalizeFilePath(psf.StatisticsPath)] = 
false
                }
        }
 
+       if len(metadata.Snapshots()) > 0 && fs == nil {
+               return nil, errors.New("fs cannot be nil when table has 
snapshots")
+       }
+
        for _, snapshot := range metadata.Snapshots() {
                if snapshot.ManifestList != "" {
-                       referenced[snapshot.ManifestList] = true
+                       referenced[normalizeFilePath(snapshot.ManifestList)] = 
false
                }
 
                manifestFiles, err := snapshot.Manifests(fs)
@@ -266,7 +276,7 @@ func (t Table) getReferencedFiles(fs iceio.IO) 
(map[string]bool, error) {
                }
 
                for _, manifest := range manifestFiles {
-                       referenced[manifest.FilePath()] = true
+                       referenced[normalizeFilePath(manifest.FilePath())] = 
false
 
                        // discardDeleted=true: skip DELETED-status entries when
                        // computing the reachable file set. A DELETED entry is
@@ -274,11 +284,19 @@ func (t Table) getReferencedFiles(fs iceio.IO) 
(map[string]bool, error) {
                        // against orphan cleanup once the snapshot that
                        // originally held it live has been expired.
                        // This matches iceberg-java and pyiceberg behavior.
-                       for entry, err := range manifest.Entries(fs, true) {
+                       for entry, err := range manifest.Entries(fs, 
discardDeleted) {
                                if err != nil {
                                        return nil, fmt.Errorf("failed to read 
manifest entries: %w", err)
                                }
-                               referenced[entry.DataFile().FilePath()] = true
+                               // All files tracked within a manifest (data 
files, equality deletes, position deletes)
+                               // are considered "data files" for the purposes 
of gc.enabled.
+                               
referenced[normalizeFilePath(entry.DataFile().FilePath())] = true
+                               if ref := 
entry.DataFile().ReferencedDataFile(); ref != nil {
+                                       // This is a deletion vector entry 
referencing a data file.
+                                       // Its FilePath() is the deletion 
vector (.dv) file itself (added above).
+                                       // We must also mark the referenced 
data file as referenced.
+                                       referenced[normalizeFilePath(*ref)] = 
true
+                               }
                        }
                }
        }
@@ -396,7 +414,7 @@ func makeFileWalkFunc(fn func(path string, info 
stdfs.FileInfo) error, pathTrans
 func identifyOrphanFiles(allFiles []string, referencedFiles map[string]bool, 
cfg *orphanCleanupConfig) ([]string, error) {
        normalizedReferencedFiles := make(map[string]string)
        for refPath := range referencedFiles {
-               normalizedPath := normalizeFilePath(refPath, cfg)
+               normalizedPath := normalizeFilePathWithConfig(refPath, cfg)
                normalizedReferencedFiles[normalizedPath] = refPath
                // Also include the original path for direct lookup
                normalizedReferencedFiles[refPath] = refPath
@@ -419,7 +437,7 @@ func identifyOrphanFiles(allFiles []string, referencedFiles 
map[string]bool, cfg
 }
 
 func isFileOrphan(file string, referencedFiles map[string]bool, 
normalizedReferencedFiles map[string]string, cfg *orphanCleanupConfig) (bool, 
error) {
-       normalizedFile := normalizeFilePath(file, cfg)
+       normalizedFile := normalizeFilePathWithConfig(file, cfg)
 
        if referencedFiles[file] || referencedFiles[normalizedFile] {
                return false, nil
@@ -547,13 +565,37 @@ func deleteFilesParallel(fs iceio.IO, orphanFiles 
[]string, cfg *orphanCleanupCo
 //  3. Path separators and casing may vary across different systems and 
configurations
 //  4. Without normalization, semantically identical paths would be treated as 
different,
 //     leading to false positives in orphan detection
-func normalizeFilePath(path string, cfg *orphanCleanupConfig) string {
+//
+// normalizeFilePath normalizes a file path for comparison, handling schemes, 
authorities, and separators.
+// It also aligns file:// URIs and bare local file system paths so they 
normalize to the same format.
+func normalizeFilePath(path string) string {
+       return normalizeFilePathWithConfig(path, nil)
+}
+
+func normalizeFilePathWithConfig(path string, cfg *orphanCleanupConfig) string 
{
+       if strings.HasPrefix(path, "file:") {
+               if u, err := url.Parse(path); err == nil {
+                       host := strings.ToLower(u.Host)
+                       if host == "" || host == "localhost" {
+                               pathStr := u.Path
+                               // Intercept Windows drive letters (e.g., /C:/) 
and strip the leading slash
+                               if len(pathStr) >= 3 && pathStr[0] == '/' && 
pathStr[2] == ':' {
+                                       pathStr = pathStr[1:]
+                               }
+
+                               return filepath.Clean(pathStr)
+                       }
+                       // Remote authority – keep it as //host/path
+                       return filepath.Clean("//" + u.Host + u.Path)
+               }
+       }
+
        // Handle URL-based paths (s3://, gs://, etc.)
        if strings.Contains(path, "://") {
                return normalizeURLPath(path, cfg)
-       } else {
-               return normalizeNonURLPath(path)
        }
+
+       return normalizeNonURLPath(path)
 }
 
 // normalizeURLPath normalizes URL-based file paths with scheme/authority 
equivalence.
@@ -577,8 +619,15 @@ func normalizeURLPath(path string, cfg 
*orphanCleanupConfig) string {
                return normalizeNonURLPath(path)
        }
 
-       normalizedScheme := applySchemeEquivalence(parsedURL.Scheme, 
cfg.equalSchemes)
-       normalizedAuthority := applyAuthorityEquivalence(parsedURL.Host, 
cfg.equalAuthorities)
+       var equalSchemes map[string]string
+       var equalAuthorities map[string]string
+       if cfg != nil {
+               equalSchemes = cfg.equalSchemes
+               equalAuthorities = cfg.equalAuthorities
+       }
+
+       normalizedScheme := applySchemeEquivalence(parsedURL.Scheme, 
equalSchemes)
+       normalizedAuthority := applyAuthorityEquivalence(parsedURL.Host, 
equalAuthorities)
        normalizedURL := &url.URL{
                Scheme: normalizedScheme,
                Host:   normalizedAuthority,
@@ -708,3 +757,105 @@ func checkPrefixMismatch(referencedPath, filesystemPath 
string, cfg *orphanClean
                return fmt.Errorf("unknown prefix mismatch mode: %d", 
cfg.prefixMismatchMode)
        }
 }
+
+// PurgeFiles physically deletes all files under the table's warehouse location
+// and any referenced files written outside the location root (e.g., via 
write.data.path
+// or write.metadata.path properties).
+//
+// It operates on a best-effort basis. Errors from individual file deletions 
are
+// collected and returned together. If files cannot be deleted (e.g. due to
+// permission errors or missing paths), the errors are logged but the overall
+// catalog drop operation should typically proceed so the catalog does not
+// get out of sync with storage.
+func (t Table) PurgeFiles(ctx context.Context) error {
+       gcEnabled := t.Metadata().Properties().GetBool("gc.enabled", true)
+
+       fs, err := t.FS(ctx)
+       if err != nil {
+               return fmt.Errorf("failed to load filesystem for table purge: 
%w", err)
+       }
+
+       var errs []error
+       fileSet := make(map[string]string)
+       location := t.metadata.Location()
+
+       // 1. Walk the table location directory tree to capture all local files
+       // Only walk the directory if gc.enabled=true to prevent accidental 
deletion
+       // of unreferenced branched data files.
+       if gcEnabled {
+               if listable, ok := fs.(iceio.ListableIO); ok {
+                       walkErr := listable.WalkDir(location, func(path string, 
d stdfs.DirEntry, err error) error {
+                               if err := ctx.Err(); err != nil {
+                                       return err
+                               }
+                               if err != nil {
+                                       if os.IsNotExist(err) || errors.Is(err, 
stdfs.ErrNotExist) {
+                                               return nil
+                                       }
+
+                                       return err
+                               }
+                               if !d.IsDir() {
+                                       fileSet[normalizeFilePath(path)] = path
+                               }
+
+                               return nil
+                       })
+                       if walkErr != nil && !os.IsNotExist(walkErr) && 
!errors.Is(walkErr, stdfs.ErrNotExist) {
+                               errs = append(errs, fmt.Errorf("failed walking 
directory %s: %w", location, walkErr))
+                       }
+               }
+       }
+
+       // 2. Union in manifest-referenced and metadata files (which might be 
outside the table location)
+       referencedFiles, refErr := t.getReferencedFiles(fs, false)
+       if refErr != nil {
+               return fmt.Errorf("failed to get referenced files: %w", refErr)
+       }
+
+       for path, isData := range referencedFiles {
+               if !gcEnabled && isData {
+                       slog.WarnContext(ctx, "purge: skipping data file, 
gc.enabled=false", "path", path)
+
+                       continue
+               }
+
+               norm := normalizeFilePath(path)
+               if _, ok := fileSet[norm]; !ok {
+                       fileSet[norm] = path
+               }
+       }
+
+       // Convert to slice and sort for deterministic behavior
+       files := make([]string, 0, len(fileSet))
+       for _, orig := range fileSet {
+               files = append(files, orig)
+       }
+       slices.Sort(files)
+
+       if len(files) > 0 {
+               if bulk, ok := fs.(iceio.BulkRemovableIO); ok {
+                       _, bulkErr := bulk.DeleteFiles(ctx, files)
+                       if bulkErr != nil {
+                               errs = append(errs, fmt.Errorf("bulk deletion 
failed: %w", bulkErr))
+                       }
+               } else {
+                       for _, file := range files {
+                               if err := ctx.Err(); err != nil {
+                                       errs = append(errs, err)
+
+                                       break
+                               }
+                               if rmErr := fs.Remove(file); rmErr != nil && 
!os.IsNotExist(rmErr) {
+                                       errs = append(errs, fmt.Errorf("failed 
to remove %s: %w", file, rmErr))
+                               }
+                       }
+               }
+       }
+
+       if len(errs) > 0 {
+               return errors.Join(errs...)
+       }
+
+       return nil
+}
diff --git a/table/orphan_cleanup_test.go b/table/orphan_cleanup_test.go
index 43b01266..e77f90db 100644
--- a/table/orphan_cleanup_test.go
+++ b/table/orphan_cleanup_test.go
@@ -117,7 +117,7 @@ func TestNormalizeFilePath(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       result := normalizeFilePath(tt.input, cfg)
+                       result := normalizeFilePathWithConfig(tt.input, cfg)
                        assert.Equal(t, tt.expected, result)
                })
        }
@@ -365,7 +365,7 @@ func TestIsFileOrphan(t *testing.T) {
        }
        normalizedReferencedFiles := make(map[string]string)
        for refPath := range referencedFiles {
-               normalizedPath := normalizeFilePath(refPath, cfg)
+               normalizedPath := normalizeFilePathWithConfig(refPath, cfg)
                normalizedReferencedFiles[normalizedPath] = refPath
        }
 
@@ -529,13 +529,13 @@ func TestGetReferencedFiles_IncludesStatisticsFiles(t 
*testing.T) {
        }
 
        // No snapshots: FileIO is not used; statistics paths must still be 
referenced.
-       refs, err := tbl.getReferencedFiles(nil)
+       refs, err := tbl.getReferencedFiles(nil, true)
        require.NoError(t, err)
 
-       assert.True(t, refs["s3://bucket/stats/table-stats.puffin"])
-       assert.True(t, refs["s3://bucket/stats/part-stats.puffin"])
-       assert.True(t, refs[tbl.metadataLocation])
-       assert.False(t, refs["s3://bucket/stats/not-referenced.puffin"])
+       assert.Contains(t, refs, 
normalizeFilePath("s3://bucket/stats/table-stats.puffin"))
+       assert.Contains(t, refs, 
normalizeFilePath("s3://bucket/stats/part-stats.puffin"))
+       assert.Contains(t, refs, normalizeFilePath(tbl.metadataLocation))
+       assert.False(t, 
refs[normalizeFilePath("s3://bucket/stats/not-referenced.puffin")])
        assert.False(t, refs[""])
 }
 
@@ -716,12 +716,12 @@ func 
TestGetReferencedFiles_OverwriteThenExpireExcludesTombstones(t *testing.T)
 
        // fileA is now referenced only via a DELETED entry in the surviving
        // snapshot's tombstone manifest. The fix must exclude it.
-       refs, err := tbl.getReferencedFiles(fs)
+       refs, err := tbl.getReferencedFiles(fs, true)
        require.NoError(t, err)
 
-       assert.True(t, refs[fileB],
+       assert.Contains(t, refs, normalizeFilePath(fileB),
                "new live file (ADDED in surviving snapshot) must be in 
reference set")
-       assert.False(t, refs[fileA],
+       assert.NotContains(t, refs, normalizeFilePath(fileA),
                "overwritten file (only present as DELETED tombstone) must NOT 
be in reference set")
 }
 


Reply via email to