zeroshade commented on code in PR #498:
URL: https://github.com/apache/iceberg-go/pull/498#discussion_r2231759880


##########
table/manifest_operations_test.go:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+)
+
+type ManifestOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestManifestOperations(t *testing.T) {
+       suite.Run(t, new(ManifestOperationsTestSuite))
+}
+
+func (s *ManifestOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "value", Type: 
iceberg.PrimitiveTypes.Int32})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "value", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "value": 100},
+                       {"id": 2, "data": "bar", "value": 200},
+                       {"id": 3, "data": "baz", "value": 300}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *ManifestOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *ManifestOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *ManifestOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *ManifestOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *ManifestOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec, props iceberg.Properties) 
*table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, props)
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *ManifestOperationsTestSuite) getFS(tbl *table.Table) 
iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+func (s *ManifestOperationsTestSuite) createTableWithManyFiles(identifier 
table.Identifier, numFiles int, enableMerge bool) (*table.Table, []string) {
+       // Configure table properties for manifest merging
+       props := iceberg.Properties{"format-version": "2"}
+       if enableMerge {
+               // Enable manifest merging with aggressive settings for testing
+               props[table.ManifestMergeEnabledKey] = "true"
+               props[table.ManifestMinMergeCountKey] = "2"      // Merge when 
we have 2+ manifests
+               props[table.ManifestTargetSizeBytesKey] = "4096" // Small 
target size for testing
+       } else {
+               props[table.ManifestMergeEnabledKey] = "false"
+       }
+
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec, props)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       // Create and add files one by one to generate multiple manifest files
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+
+               // Add each file in a separate transaction to create multiple 
manifests
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{filePath}, nil, 
false))
+
+               committedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+               tbl = committedTbl
+       }
+
+       return tbl, files
+}
+
+func TestRewriteManifests(t *testing.T) {
+       suite.Run(t, &ManifestOperationsTestSuite{})
+}
+
+func (s *ManifestOperationsTestSuite) TestRewriteManifests() {
+       s.Run("ConsolidateManifests", func() {

Review Comment:
   there's no shared infrastructure, just make this the function and split the 
other subtests into their own functions



##########
table/manifest_operations_test.go:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+)
+
+type ManifestOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestManifestOperations(t *testing.T) {
+       suite.Run(t, new(ManifestOperationsTestSuite))
+}
+
+func (s *ManifestOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "value", Type: 
iceberg.PrimitiveTypes.Int32})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "value", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "value": 100},
+                       {"id": 2, "data": "bar", "value": 200},
+                       {"id": 3, "data": "baz", "value": 300}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *ManifestOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *ManifestOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *ManifestOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *ManifestOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *ManifestOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec, props iceberg.Properties) 
*table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, props)
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *ManifestOperationsTestSuite) getFS(tbl *table.Table) 
iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+func (s *ManifestOperationsTestSuite) createTableWithManyFiles(identifier 
table.Identifier, numFiles int, enableMerge bool) (*table.Table, []string) {
+       // Configure table properties for manifest merging
+       props := iceberg.Properties{"format-version": "2"}
+       if enableMerge {
+               // Enable manifest merging with aggressive settings for testing
+               props[table.ManifestMergeEnabledKey] = "true"
+               props[table.ManifestMinMergeCountKey] = "2"      // Merge when 
we have 2+ manifests
+               props[table.ManifestTargetSizeBytesKey] = "4096" // Small 
target size for testing
+       } else {
+               props[table.ManifestMergeEnabledKey] = "false"
+       }
+
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec, props)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       // Create and add files one by one to generate multiple manifest files
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+
+               // Add each file in a separate transaction to create multiple 
manifests
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{filePath}, nil, 
false))
+
+               committedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+               tbl = committedTbl
+       }
+
+       return tbl, files
+}
+
+func TestRewriteManifests(t *testing.T) {
+       suite.Run(t, &ManifestOperationsTestSuite{})
+}
+
+func (s *ManifestOperationsTestSuite) TestRewriteManifests() {
+       s.Run("ConsolidateManifests", func() {
+               // Test consolidating multiple small manifests
+               ident := table.Identifier{"default", 
"consolidate_manifests_test"}
+
+               // Create table with manifest merging disabled first to create 
multiple manifests
+               tblWithoutMerge, _ := s.createTableWithManyFiles(ident, 5, 
false)
+
+               // Get current snapshot to examine manifests
+               snapshot := tblWithoutMerge.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               fs := s.getFS(tblWithoutMerge)
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               initialManifestCount := len(manifests)
+               s.T().Logf("Initial manifest count: %d", initialManifestCount)
+
+               // Verify we have multiple manifests (one per file add 
operation)
+               s.GreaterOrEqual(initialManifestCount, 5, "Should have multiple 
manifests from separate transactions")
+
+               // Now enable manifest merging and perform an operation to 
trigger consolidation
+               ident2 := table.Identifier{"default", 
"consolidate_manifests_merged"}
+               tblWithMerge := s.createTable(ident2, 2, 
*iceberg.UnpartitionedSpec, iceberg.Properties{
+                       "format-version":                 "2",
+                       table.ManifestMergeEnabledKey:    "true",
+                       table.ManifestMinMergeCountKey:   "2",    // Merge when 
2+ manifests
+                       table.ManifestTargetSizeBytesKey: "8192", // Small 
target for testing
+               })
+
+               // Add multiple files in one transaction with merging enabled
+               files := make([]string, 0, 5)
+               for i := 0; i < 5; i++ {
+                       filePath := fmt.Sprintf("%s/data/merged-%d.parquet", 
s.location, i)
+                       s.writeParquet(s.getFS(tblWithMerge), filePath, 
s.arrTable)
+                       files = append(files, filePath)
+               }
+
+               tx := tblWithMerge.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false))
+
+               mergedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Check manifests after merging
+               mergedSnapshot := mergedTbl.CurrentSnapshot()
+               s.Require().NotNil(mergedSnapshot)
+
+               mergedManifests, err := 
mergedSnapshot.Manifests(s.getFS(mergedTbl))
+               s.Require().NoError(err)
+
+               mergedManifestCount := len(mergedManifests)
+               s.T().Logf("Merged manifest count: %d", mergedManifestCount)
+
+               // Verify manifest count reduction (should have fewer manifests 
due to merging)
+               s.LessOrEqual(mergedManifestCount, 3, "Manifest merging should 
reduce manifest count")
+
+               // Verify data integrity
+               scan := mergedTbl.Scan()
+               results, err := scan.ToArrowTable(s.ctx)
+               s.Require().NoError(err)
+               defer results.Release()
+
+               s.Equal(int64(15), results.NumRows(), "Should have all data 
after manifest consolidation")
+
+               s.T().Log("Successfully consolidated manifests and verified 
data integrity")
+       })
+
+       s.Run("RewriteManifestsWithDeletes", func() {

Review Comment:
   make this a separate function



##########
table/manifest_operations_test.go:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+)
+
+type ManifestOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestManifestOperations(t *testing.T) {
+       suite.Run(t, new(ManifestOperationsTestSuite))
+}
+
+func (s *ManifestOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "value", Type: 
iceberg.PrimitiveTypes.Int32})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "value", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "value": 100},
+                       {"id": 2, "data": "bar", "value": 200},
+                       {"id": 3, "data": "baz", "value": 300}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *ManifestOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *ManifestOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *ManifestOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *ManifestOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *ManifestOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec, props iceberg.Properties) 
*table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, props)
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *ManifestOperationsTestSuite) getFS(tbl *table.Table) 
iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+func (s *ManifestOperationsTestSuite) createTableWithManyFiles(identifier 
table.Identifier, numFiles int, enableMerge bool) (*table.Table, []string) {
+       // Configure table properties for manifest merging
+       props := iceberg.Properties{"format-version": "2"}
+       if enableMerge {
+               // Enable manifest merging with aggressive settings for testing
+               props[table.ManifestMergeEnabledKey] = "true"
+               props[table.ManifestMinMergeCountKey] = "2"      // Merge when 
we have 2+ manifests
+               props[table.ManifestTargetSizeBytesKey] = "4096" // Small 
target size for testing
+       } else {
+               props[table.ManifestMergeEnabledKey] = "false"
+       }
+
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec, props)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       // Create and add files one by one to generate multiple manifest files
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+
+               // Add each file in a separate transaction to create multiple 
manifests
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{filePath}, nil, 
false))
+
+               committedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+               tbl = committedTbl
+       }
+
+       return tbl, files
+}
+
+func TestRewriteManifests(t *testing.T) {
+       suite.Run(t, &ManifestOperationsTestSuite{})
+}
+
+func (s *ManifestOperationsTestSuite) TestRewriteManifests() {
+       s.Run("ConsolidateManifests", func() {
+               // Test consolidating multiple small manifests
+               ident := table.Identifier{"default", 
"consolidate_manifests_test"}
+
+               // Create table with manifest merging disabled first to create 
multiple manifests
+               tblWithoutMerge, _ := s.createTableWithManyFiles(ident, 5, 
false)
+
+               // Get current snapshot to examine manifests
+               snapshot := tblWithoutMerge.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               fs := s.getFS(tblWithoutMerge)
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               initialManifestCount := len(manifests)
+               s.T().Logf("Initial manifest count: %d", initialManifestCount)
+
+               // Verify we have multiple manifests (one per file add 
operation)
+               s.GreaterOrEqual(initialManifestCount, 5, "Should have multiple 
manifests from separate transactions")
+
+               // Now enable manifest merging and perform an operation to 
trigger consolidation
+               ident2 := table.Identifier{"default", 
"consolidate_manifests_merged"}
+               tblWithMerge := s.createTable(ident2, 2, 
*iceberg.UnpartitionedSpec, iceberg.Properties{
+                       "format-version":                 "2",
+                       table.ManifestMergeEnabledKey:    "true",
+                       table.ManifestMinMergeCountKey:   "2",    // Merge when 
2+ manifests
+                       table.ManifestTargetSizeBytesKey: "8192", // Small 
target for testing
+               })
+
+               // Add multiple files in one transaction with merging enabled
+               files := make([]string, 0, 5)
+               for i := 0; i < 5; i++ {
+                       filePath := fmt.Sprintf("%s/data/merged-%d.parquet", 
s.location, i)
+                       s.writeParquet(s.getFS(tblWithMerge), filePath, 
s.arrTable)
+                       files = append(files, filePath)
+               }
+
+               tx := tblWithMerge.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false))
+
+               mergedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Check manifests after merging
+               mergedSnapshot := mergedTbl.CurrentSnapshot()
+               s.Require().NotNil(mergedSnapshot)
+
+               mergedManifests, err := 
mergedSnapshot.Manifests(s.getFS(mergedTbl))
+               s.Require().NoError(err)
+
+               mergedManifestCount := len(mergedManifests)
+               s.T().Logf("Merged manifest count: %d", mergedManifestCount)
+
+               // Verify manifest count reduction (should have fewer manifests 
due to merging)
+               s.LessOrEqual(mergedManifestCount, 3, "Manifest merging should 
reduce manifest count")
+
+               // Verify data integrity
+               scan := mergedTbl.Scan()
+               results, err := scan.ToArrowTable(s.ctx)
+               s.Require().NoError(err)
+               defer results.Release()
+
+               s.Equal(int64(15), results.NumRows(), "Should have all data 
after manifest consolidation")
+
+               s.T().Log("Successfully consolidated manifests and verified 
data integrity")
+       })
+
+       s.Run("RewriteManifestsWithDeletes", func() {
+               // Test rewriting manifests that contain delete files
+               ident := table.Identifier{"default", 
"rewrite_with_deletes_test"}
+               tbl, originalFiles := s.createTableWithManyFiles(ident, 3, 
false)
+
+               // Create position delete files
+               mem := memory.DefaultAllocator
+               posDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "file_path", Type: arrow.BinaryTypes.String, 
Nullable: false},
+                       {Name: "pos", Type: arrow.PrimitiveTypes.Int32, 
Nullable: false},
+               }, nil)
+
+               // Create delete data for first file, deleting position 1
+               deleteData, err := array.TableFromJSON(mem, posDeleteSchema, 
[]string{
+                       fmt.Sprintf(`[{"file_path": "%s", "pos": 1}]`, 
originalFiles[0]),
+               })
+               s.Require().NoError(err)
+               defer deleteData.Release()
+
+               // Write position delete file
+               deleteFilePath := s.location + "/deletes/pos_deletes.parquet"
+               fs := s.getFS(tbl)
+               s.writeParquet(fs, deleteFilePath, deleteData)
+
+               // Create equality delete file
+               eqDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "id", Type: arrow.PrimitiveTypes.Int64, 
Nullable: false},
+               }, nil)
+
+               eqDeleteData, err := array.TableFromJSON(mem, eqDeleteSchema, 
[]string{
+                       `[{"id": 2}]`, // Delete row with id=2
+               })
+               s.Require().NoError(err)
+               defer eqDeleteData.Release()
+
+               eqDeleteFilePath := s.location + "/deletes/eq_deletes.parquet"
+               s.writeParquet(fs, eqDeleteFilePath, eqDeleteData)
+
+               // Now perform an operation that would trigger manifest 
rewriting
+               // Add another file to trigger potential manifest consolidation
+               newFilePath := s.location + "/data/additional.parquet"
+               s.writeParquet(fs, newFilePath, s.arrTable)
+
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{newFilePath}, 
nil, false))
+
+               updatedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Verify proper handling of delete file references
+               snapshot := updatedTbl.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               s.Greater(len(manifests), 0, "Should have manifest files")
+
+               // Verify that manifests can be read properly (structure 
validation)
+               for _, manifest := range manifests {
+                       entries, err := manifest.FetchEntries(fs, false)
+                       s.Require().NoError(err)
+                       s.GreaterOrEqual(len(entries), 0, "Should be able to 
read manifest entries")
+
+                       // Log manifest contents for verification
+                       s.T().Logf("Manifest %s has %d entries, content type: 
%v",
+                               manifest.FilePath(), len(entries), 
manifest.ManifestContent())
+               }
+
+               s.T().Log("Successfully handled manifests with delete file 
references")

Review Comment:
   do we really need the log lines?



##########
table/table_test.go:
##########
@@ -1243,8 +1243,8 @@ func TestNullableStructRequiredField(t *testing.T) {
        sc, err := table.ArrowSchemaToIcebergWithFreshIDs(arrowSchema, false)
        require.NoError(t, err)
 
-       require.NoError(t, cat.CreateNamespace(t.Context(), 
table.Identifier{"testing"}, nil))
-       tbl, err := cat.CreateTable(t.Context(), table.Identifier{"testing", 
"nullable_struct_required_field"}, sc,
+       require.NoError(t, cat.CreateNamespace(context.Background(), 
table.Identifier{"testing"}, nil))
+       tbl, err := cat.CreateTable(context.Background(), 
table.Identifier{"testing", "nullable_struct_required_field"}, sc,

Review Comment:
   make a `ctx` variable set to `context.Background()` and just reuse it



##########
table/manifest_operations_test.go:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+)
+
+type ManifestOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestManifestOperations(t *testing.T) {
+       suite.Run(t, new(ManifestOperationsTestSuite))
+}
+
+func (s *ManifestOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "value", Type: 
iceberg.PrimitiveTypes.Int32})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "value", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "value": 100},
+                       {"id": 2, "data": "bar", "value": 200},
+                       {"id": 3, "data": "baz", "value": 300}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *ManifestOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *ManifestOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *ManifestOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *ManifestOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *ManifestOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec, props iceberg.Properties) 
*table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, props)
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *ManifestOperationsTestSuite) getFS(tbl *table.Table) 
iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+func (s *ManifestOperationsTestSuite) createTableWithManyFiles(identifier 
table.Identifier, numFiles int, enableMerge bool) (*table.Table, []string) {
+       // Configure table properties for manifest merging
+       props := iceberg.Properties{"format-version": "2"}
+       if enableMerge {
+               // Enable manifest merging with aggressive settings for testing
+               props[table.ManifestMergeEnabledKey] = "true"
+               props[table.ManifestMinMergeCountKey] = "2"      // Merge when 
we have 2+ manifests
+               props[table.ManifestTargetSizeBytesKey] = "4096" // Small 
target size for testing
+       } else {
+               props[table.ManifestMergeEnabledKey] = "false"
+       }
+
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec, props)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       // Create and add files one by one to generate multiple manifest files
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+
+               // Add each file in a separate transaction to create multiple 
manifests
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{filePath}, nil, 
false))
+
+               committedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+               tbl = committedTbl
+       }
+
+       return tbl, files
+}
+
+func TestRewriteManifests(t *testing.T) {
+       suite.Run(t, &ManifestOperationsTestSuite{})
+}
+
+func (s *ManifestOperationsTestSuite) TestRewriteManifests() {
+       s.Run("ConsolidateManifests", func() {
+               // Test consolidating multiple small manifests
+               ident := table.Identifier{"default", 
"consolidate_manifests_test"}
+
+               // Create table with manifest merging disabled first to create 
multiple manifests
+               tblWithoutMerge, _ := s.createTableWithManyFiles(ident, 5, 
false)
+
+               // Get current snapshot to examine manifests
+               snapshot := tblWithoutMerge.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               fs := s.getFS(tblWithoutMerge)
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               initialManifestCount := len(manifests)
+               s.T().Logf("Initial manifest count: %d", initialManifestCount)
+
+               // Verify we have multiple manifests (one per file add 
operation)
+               s.GreaterOrEqual(initialManifestCount, 5, "Should have multiple 
manifests from separate transactions")
+
+               // Now enable manifest merging and perform an operation to 
trigger consolidation
+               ident2 := table.Identifier{"default", 
"consolidate_manifests_merged"}
+               tblWithMerge := s.createTable(ident2, 2, 
*iceberg.UnpartitionedSpec, iceberg.Properties{
+                       "format-version":                 "2",
+                       table.ManifestMergeEnabledKey:    "true",
+                       table.ManifestMinMergeCountKey:   "2",    // Merge when 
2+ manifests
+                       table.ManifestTargetSizeBytesKey: "8192", // Small 
target for testing
+               })
+
+               // Add multiple files in one transaction with merging enabled
+               files := make([]string, 0, 5)
+               for i := 0; i < 5; i++ {
+                       filePath := fmt.Sprintf("%s/data/merged-%d.parquet", 
s.location, i)
+                       s.writeParquet(s.getFS(tblWithMerge), filePath, 
s.arrTable)
+                       files = append(files, filePath)
+               }
+
+               tx := tblWithMerge.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false))
+
+               mergedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Check manifests after merging
+               mergedSnapshot := mergedTbl.CurrentSnapshot()
+               s.Require().NotNil(mergedSnapshot)
+
+               mergedManifests, err := 
mergedSnapshot.Manifests(s.getFS(mergedTbl))
+               s.Require().NoError(err)
+
+               mergedManifestCount := len(mergedManifests)
+               s.T().Logf("Merged manifest count: %d", mergedManifestCount)
+
+               // Verify manifest count reduction (should have fewer manifests 
due to merging)
+               s.LessOrEqual(mergedManifestCount, 3, "Manifest merging should 
reduce manifest count")
+
+               // Verify data integrity
+               scan := mergedTbl.Scan()
+               results, err := scan.ToArrowTable(s.ctx)
+               s.Require().NoError(err)
+               defer results.Release()
+
+               s.Equal(int64(15), results.NumRows(), "Should have all data 
after manifest consolidation")
+
+               s.T().Log("Successfully consolidated manifests and verified 
data integrity")
+       })
+
+       s.Run("RewriteManifestsWithDeletes", func() {
+               // Test rewriting manifests that contain delete files
+               ident := table.Identifier{"default", 
"rewrite_with_deletes_test"}
+               tbl, originalFiles := s.createTableWithManyFiles(ident, 3, 
false)
+
+               // Create position delete files
+               mem := memory.DefaultAllocator
+               posDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "file_path", Type: arrow.BinaryTypes.String, 
Nullable: false},
+                       {Name: "pos", Type: arrow.PrimitiveTypes.Int32, 
Nullable: false},
+               }, nil)
+
+               // Create delete data for first file, deleting position 1
+               deleteData, err := array.TableFromJSON(mem, posDeleteSchema, 
[]string{
+                       fmt.Sprintf(`[{"file_path": "%s", "pos": 1}]`, 
originalFiles[0]),
+               })
+               s.Require().NoError(err)
+               defer deleteData.Release()
+
+               // Write position delete file
+               deleteFilePath := s.location + "/deletes/pos_deletes.parquet"
+               fs := s.getFS(tbl)
+               s.writeParquet(fs, deleteFilePath, deleteData)
+
+               // Create equality delete file
+               eqDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "id", Type: arrow.PrimitiveTypes.Int64, 
Nullable: false},
+               }, nil)
+
+               eqDeleteData, err := array.TableFromJSON(mem, eqDeleteSchema, 
[]string{
+                       `[{"id": 2}]`, // Delete row with id=2
+               })
+               s.Require().NoError(err)
+               defer eqDeleteData.Release()
+
+               eqDeleteFilePath := s.location + "/deletes/eq_deletes.parquet"
+               s.writeParquet(fs, eqDeleteFilePath, eqDeleteData)
+
+               // Now perform an operation that would trigger manifest 
rewriting
+               // Add another file to trigger potential manifest consolidation
+               newFilePath := s.location + "/data/additional.parquet"
+               s.writeParquet(fs, newFilePath, s.arrTable)
+
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{newFilePath}, 
nil, false))
+
+               updatedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Verify proper handling of delete file references
+               snapshot := updatedTbl.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               s.Greater(len(manifests), 0, "Should have manifest files")
+
+               // Verify that manifests can be read properly (structure 
validation)
+               for _, manifest := range manifests {
+                       entries, err := manifest.FetchEntries(fs, false)
+                       s.Require().NoError(err)
+                       s.GreaterOrEqual(len(entries), 0, "Should be able to 
read manifest entries")
+
+                       // Log manifest contents for verification
+                       s.T().Logf("Manifest %s has %d entries, content type: 
%v",
+                               manifest.FilePath(), len(entries), 
manifest.ManifestContent())
+               }
+
+               s.T().Log("Successfully handled manifests with delete file 
references")
+       })
+}
+
+func (s *ManifestOperationsTestSuite) TestManifestCleanup() {
+       s.Run("RemoveUnreferencedManifests", func() {

Review Comment:
   same as above



##########
table/manifest_operations_test.go:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+)
+
+type ManifestOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestManifestOperations(t *testing.T) {
+       suite.Run(t, new(ManifestOperationsTestSuite))
+}
+
+func (s *ManifestOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "value", Type: 
iceberg.PrimitiveTypes.Int32})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "value", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "value": 100},
+                       {"id": 2, "data": "bar", "value": 200},
+                       {"id": 3, "data": "baz", "value": 300}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *ManifestOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *ManifestOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *ManifestOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *ManifestOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *ManifestOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec, props iceberg.Properties) 
*table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, props)
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *ManifestOperationsTestSuite) getFS(tbl *table.Table) 
iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+func (s *ManifestOperationsTestSuite) createTableWithManyFiles(identifier 
table.Identifier, numFiles int, enableMerge bool) (*table.Table, []string) {
+       // Configure table properties for manifest merging
+       props := iceberg.Properties{"format-version": "2"}
+       if enableMerge {
+               // Enable manifest merging with aggressive settings for testing
+               props[table.ManifestMergeEnabledKey] = "true"
+               props[table.ManifestMinMergeCountKey] = "2"      // Merge when 
we have 2+ manifests
+               props[table.ManifestTargetSizeBytesKey] = "4096" // Small 
target size for testing
+       } else {
+               props[table.ManifestMergeEnabledKey] = "false"
+       }
+
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec, props)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       // Create and add files one by one to generate multiple manifest files
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+
+               // Add each file in a separate transaction to create multiple 
manifests
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{filePath}, nil, 
false))
+
+               committedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+               tbl = committedTbl
+       }
+
+       return tbl, files
+}
+
+func TestRewriteManifests(t *testing.T) {
+       suite.Run(t, &ManifestOperationsTestSuite{})
+}
+
+func (s *ManifestOperationsTestSuite) TestRewriteManifests() {
+       s.Run("ConsolidateManifests", func() {
+               // Test consolidating multiple small manifests
+               ident := table.Identifier{"default", 
"consolidate_manifests_test"}
+
+               // Create table with manifest merging disabled first to create 
multiple manifests
+               tblWithoutMerge, _ := s.createTableWithManyFiles(ident, 5, 
false)
+
+               // Get current snapshot to examine manifests
+               snapshot := tblWithoutMerge.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               fs := s.getFS(tblWithoutMerge)
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               initialManifestCount := len(manifests)
+               s.T().Logf("Initial manifest count: %d", initialManifestCount)
+
+               // Verify we have multiple manifests (one per file add 
operation)
+               s.GreaterOrEqual(initialManifestCount, 5, "Should have multiple 
manifests from separate transactions")
+
+               // Now enable manifest merging and perform an operation to 
trigger consolidation
+               ident2 := table.Identifier{"default", 
"consolidate_manifests_merged"}
+               tblWithMerge := s.createTable(ident2, 2, 
*iceberg.UnpartitionedSpec, iceberg.Properties{
+                       "format-version":                 "2",
+                       table.ManifestMergeEnabledKey:    "true",
+                       table.ManifestMinMergeCountKey:   "2",    // Merge when 
2+ manifests
+                       table.ManifestTargetSizeBytesKey: "8192", // Small 
target for testing
+               })
+
+               // Add multiple files in one transaction with merging enabled
+               files := make([]string, 0, 5)
+               for i := 0; i < 5; i++ {
+                       filePath := fmt.Sprintf("%s/data/merged-%d.parquet", 
s.location, i)
+                       s.writeParquet(s.getFS(tblWithMerge), filePath, 
s.arrTable)
+                       files = append(files, filePath)
+               }
+
+               tx := tblWithMerge.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false))
+
+               mergedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Check manifests after merging
+               mergedSnapshot := mergedTbl.CurrentSnapshot()
+               s.Require().NotNil(mergedSnapshot)
+
+               mergedManifests, err := 
mergedSnapshot.Manifests(s.getFS(mergedTbl))
+               s.Require().NoError(err)
+
+               mergedManifestCount := len(mergedManifests)
+               s.T().Logf("Merged manifest count: %d", mergedManifestCount)
+
+               // Verify manifest count reduction (should have fewer manifests 
due to merging)
+               s.LessOrEqual(mergedManifestCount, 3, "Manifest merging should 
reduce manifest count")
+
+               // Verify data integrity
+               scan := mergedTbl.Scan()
+               results, err := scan.ToArrowTable(s.ctx)
+               s.Require().NoError(err)
+               defer results.Release()
+
+               s.Equal(int64(15), results.NumRows(), "Should have all data 
after manifest consolidation")
+
+               s.T().Log("Successfully consolidated manifests and verified 
data integrity")
+       })
+
+       s.Run("RewriteManifestsWithDeletes", func() {
+               // Test rewriting manifests that contain delete files
+               ident := table.Identifier{"default", 
"rewrite_with_deletes_test"}
+               tbl, originalFiles := s.createTableWithManyFiles(ident, 3, 
false)
+
+               // Create position delete files
+               mem := memory.DefaultAllocator
+               posDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "file_path", Type: arrow.BinaryTypes.String, 
Nullable: false},
+                       {Name: "pos", Type: arrow.PrimitiveTypes.Int32, 
Nullable: false},
+               }, nil)
+
+               // Create delete data for first file, deleting position 1
+               deleteData, err := array.TableFromJSON(mem, posDeleteSchema, 
[]string{
+                       fmt.Sprintf(`[{"file_path": "%s", "pos": 1}]`, 
originalFiles[0]),
+               })
+               s.Require().NoError(err)
+               defer deleteData.Release()
+
+               // Write position delete file
+               deleteFilePath := s.location + "/deletes/pos_deletes.parquet"
+               fs := s.getFS(tbl)
+               s.writeParquet(fs, deleteFilePath, deleteData)
+
+               // Create equality delete file
+               eqDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "id", Type: arrow.PrimitiveTypes.Int64, 
Nullable: false},
+               }, nil)
+
+               eqDeleteData, err := array.TableFromJSON(mem, eqDeleteSchema, 
[]string{
+                       `[{"id": 2}]`, // Delete row with id=2
+               })
+               s.Require().NoError(err)
+               defer eqDeleteData.Release()
+
+               eqDeleteFilePath := s.location + "/deletes/eq_deletes.parquet"
+               s.writeParquet(fs, eqDeleteFilePath, eqDeleteData)
+
+               // Now perform an operation that would trigger manifest 
rewriting
+               // Add another file to trigger potential manifest consolidation
+               newFilePath := s.location + "/data/additional.parquet"
+               s.writeParquet(fs, newFilePath, s.arrTable)
+
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{newFilePath}, 
nil, false))
+
+               updatedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Verify proper handling of delete file references
+               snapshot := updatedTbl.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               s.Greater(len(manifests), 0, "Should have manifest files")
+
+               // Verify that manifests can be read properly (structure 
validation)
+               for _, manifest := range manifests {
+                       entries, err := manifest.FetchEntries(fs, false)
+                       s.Require().NoError(err)
+                       s.GreaterOrEqual(len(entries), 0, "Should be able to 
read manifest entries")
+
+                       // Log manifest contents for verification
+                       s.T().Logf("Manifest %s has %d entries, content type: 
%v",
+                               manifest.FilePath(), len(entries), 
manifest.ManifestContent())
+               }
+
+               s.T().Log("Successfully handled manifests with delete file 
references")
+       })
+}
+
+func (s *ManifestOperationsTestSuite) TestManifestCleanup() {
+       s.Run("RemoveUnreferencedManifests", func() {
+               // Test cleanup of unreferenced manifest files
+               ident := table.Identifier{"default", "manifest_cleanup_test"}
+
+               // Create table with multiple snapshots to generate 
unreferenced manifests
+               tbl := s.createTable(ident, 2, *iceberg.UnpartitionedSpec, 
iceberg.Properties{
+                       "format-version":              "2",
+                       table.ManifestMergeEnabledKey: "false", // Disable 
merging to create more manifests
+               })
+
+               fs := s.getFS(tbl)
+
+               // Add files in separate transactions to create multiple 
snapshots
+               var allSnapshots []int64
+               for i := 0; i < 3; i++ {
+                       filePath := 
fmt.Sprintf("%s/data/cleanup-test-%d.parquet", s.location, i)
+                       s.writeParquet(fs, filePath, s.arrTable)
+
+                       tx := tbl.NewTransaction()
+                       s.Require().NoError(tx.AddFiles(s.ctx, 
[]string{filePath}, nil, false))
+
+                       committedTbl, err := tx.Commit(s.ctx)
+                       s.Require().NoError(err)
+
+                       snapshot := committedTbl.CurrentSnapshot()
+                       s.Require().NotNil(snapshot)
+                       allSnapshots = append(allSnapshots, snapshot.SnapshotID)
+
+                       tbl = committedTbl
+               }
+
+               s.T().Logf("Created %d snapshots: %v", len(allSnapshots), 
allSnapshots)
+
+               // Get all manifests from all snapshots
+               var allManifestPaths []string
+
+               // Examine current snapshot manifests
+               currentSnapshot := tbl.CurrentSnapshot()
+               s.Require().NotNil(currentSnapshot)
+
+               currentManifests, err := currentSnapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               for _, manifest := range currentManifests {
+                       allManifestPaths = append(allManifestPaths, 
manifest.FilePath())
+               }
+
+               s.T().Logf("Current snapshot has %d manifests", 
len(currentManifests))
+
+               // Test manifest accessibility and structure
+               referencedCount := 0
+               for _, manifestPath := range allManifestPaths {
+                       // Try to read the manifest to verify it's properly 
structured
+                       file, err := fs.Open(manifestPath)
+                       if err != nil {
+                               s.T().Logf("Could not open manifest %s: %v", 
manifestPath, err)
+
+                               continue
+                       }
+                       file.Close()
+                       referencedCount++
+               }
+
+               s.T().Logf("Found %d accessible manifest files", 
referencedCount)
+               s.Greater(referencedCount, 0, "Should have accessible 
referenced manifests")
+
+               // In a real implementation, cleanup would:
+               // 1. Identify all snapshots still referenced by table metadata
+               // 2. Collect all manifests referenced by those snapshots
+               // 3. Find manifests on disk not referenced by any snapshot
+               // 4. Safely delete unreferenced manifests
+
+               // For this test, we verify the basic structure is correct
+               s.T().Log("Manifest cleanup test completed - verified manifest 
accessibility")
+               s.T().Log("Real cleanup would require catalog-level snapshot 
management")
+       })
+
+       s.Run("SafetyChecksForCleanup", func() {

Review Comment:
   same as above, make this a separate function



##########
table/manifest_operations_test.go:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+)
+
+type ManifestOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestManifestOperations(t *testing.T) {
+       suite.Run(t, new(ManifestOperationsTestSuite))
+}
+
+func (s *ManifestOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "value", Type: 
iceberg.PrimitiveTypes.Int32})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "value", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "value": 100},
+                       {"id": 2, "data": "bar", "value": 200},
+                       {"id": 3, "data": "baz", "value": 300}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *ManifestOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *ManifestOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *ManifestOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *ManifestOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *ManifestOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec, props iceberg.Properties) 
*table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, props)
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *ManifestOperationsTestSuite) getFS(tbl *table.Table) 
iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+func (s *ManifestOperationsTestSuite) createTableWithManyFiles(identifier 
table.Identifier, numFiles int, enableMerge bool) (*table.Table, []string) {
+       // Configure table properties for manifest merging
+       props := iceberg.Properties{"format-version": "2"}
+       if enableMerge {
+               // Enable manifest merging with aggressive settings for testing
+               props[table.ManifestMergeEnabledKey] = "true"
+               props[table.ManifestMinMergeCountKey] = "2"      // Merge when 
we have 2+ manifests
+               props[table.ManifestTargetSizeBytesKey] = "4096" // Small 
target size for testing
+       } else {
+               props[table.ManifestMergeEnabledKey] = "false"
+       }
+
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec, props)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       // Create and add files one by one to generate multiple manifest files
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+
+               // Add each file in a separate transaction to create multiple 
manifests
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{filePath}, nil, 
false))
+
+               committedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+               tbl = committedTbl
+       }
+
+       return tbl, files
+}
+
+func TestRewriteManifests(t *testing.T) {
+       suite.Run(t, &ManifestOperationsTestSuite{})
+}
+
+func (s *ManifestOperationsTestSuite) TestRewriteManifests() {
+       s.Run("ConsolidateManifests", func() {
+               // Test consolidating multiple small manifests
+               ident := table.Identifier{"default", 
"consolidate_manifests_test"}
+
+               // Create table with manifest merging disabled first to create 
multiple manifests
+               tblWithoutMerge, _ := s.createTableWithManyFiles(ident, 5, 
false)
+
+               // Get current snapshot to examine manifests
+               snapshot := tblWithoutMerge.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               fs := s.getFS(tblWithoutMerge)
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               initialManifestCount := len(manifests)
+               s.T().Logf("Initial manifest count: %d", initialManifestCount)
+
+               // Verify we have multiple manifests (one per file add 
operation)
+               s.GreaterOrEqual(initialManifestCount, 5, "Should have multiple 
manifests from separate transactions")
+
+               // Now enable manifest merging and perform an operation to 
trigger consolidation
+               ident2 := table.Identifier{"default", 
"consolidate_manifests_merged"}
+               tblWithMerge := s.createTable(ident2, 2, 
*iceberg.UnpartitionedSpec, iceberg.Properties{
+                       "format-version":                 "2",
+                       table.ManifestMergeEnabledKey:    "true",
+                       table.ManifestMinMergeCountKey:   "2",    // Merge when 
2+ manifests
+                       table.ManifestTargetSizeBytesKey: "8192", // Small 
target for testing
+               })
+
+               // Add multiple files in one transaction with merging enabled
+               files := make([]string, 0, 5)
+               for i := 0; i < 5; i++ {
+                       filePath := fmt.Sprintf("%s/data/merged-%d.parquet", 
s.location, i)
+                       s.writeParquet(s.getFS(tblWithMerge), filePath, 
s.arrTable)
+                       files = append(files, filePath)
+               }
+
+               tx := tblWithMerge.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false))
+
+               mergedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Check manifests after merging
+               mergedSnapshot := mergedTbl.CurrentSnapshot()
+               s.Require().NotNil(mergedSnapshot)
+
+               mergedManifests, err := 
mergedSnapshot.Manifests(s.getFS(mergedTbl))
+               s.Require().NoError(err)
+
+               mergedManifestCount := len(mergedManifests)
+               s.T().Logf("Merged manifest count: %d", mergedManifestCount)
+
+               // Verify manifest count reduction (should have fewer manifests 
due to merging)
+               s.LessOrEqual(mergedManifestCount, 3, "Manifest merging should 
reduce manifest count")
+
+               // Verify data integrity
+               scan := mergedTbl.Scan()
+               results, err := scan.ToArrowTable(s.ctx)
+               s.Require().NoError(err)
+               defer results.Release()
+
+               s.Equal(int64(15), results.NumRows(), "Should have all data 
after manifest consolidation")
+
+               s.T().Log("Successfully consolidated manifests and verified 
data integrity")
+       })
+
+       s.Run("RewriteManifestsWithDeletes", func() {
+               // Test rewriting manifests that contain delete files
+               ident := table.Identifier{"default", 
"rewrite_with_deletes_test"}
+               tbl, originalFiles := s.createTableWithManyFiles(ident, 3, 
false)
+
+               // Create position delete files
+               mem := memory.DefaultAllocator
+               posDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "file_path", Type: arrow.BinaryTypes.String, 
Nullable: false},
+                       {Name: "pos", Type: arrow.PrimitiveTypes.Int32, 
Nullable: false},
+               }, nil)
+
+               // Create delete data for first file, deleting position 1
+               deleteData, err := array.TableFromJSON(mem, posDeleteSchema, 
[]string{
+                       fmt.Sprintf(`[{"file_path": "%s", "pos": 1}]`, 
originalFiles[0]),
+               })
+               s.Require().NoError(err)
+               defer deleteData.Release()
+
+               // Write position delete file
+               deleteFilePath := s.location + "/deletes/pos_deletes.parquet"
+               fs := s.getFS(tbl)
+               s.writeParquet(fs, deleteFilePath, deleteData)
+
+               // Create equality delete file
+               eqDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "id", Type: arrow.PrimitiveTypes.Int64, 
Nullable: false},
+               }, nil)
+
+               eqDeleteData, err := array.TableFromJSON(mem, eqDeleteSchema, 
[]string{
+                       `[{"id": 2}]`, // Delete row with id=2
+               })
+               s.Require().NoError(err)
+               defer eqDeleteData.Release()
+
+               eqDeleteFilePath := s.location + "/deletes/eq_deletes.parquet"
+               s.writeParquet(fs, eqDeleteFilePath, eqDeleteData)
+
+               // Now perform an operation that would trigger manifest 
rewriting
+               // Add another file to trigger potential manifest consolidation
+               newFilePath := s.location + "/data/additional.parquet"
+               s.writeParquet(fs, newFilePath, s.arrTable)
+
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{newFilePath}, 
nil, false))
+
+               updatedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Verify proper handling of delete file references
+               snapshot := updatedTbl.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               s.Greater(len(manifests), 0, "Should have manifest files")
+
+               // Verify that manifests can be read properly (structure 
validation)
+               for _, manifest := range manifests {
+                       entries, err := manifest.FetchEntries(fs, false)
+                       s.Require().NoError(err)
+                       s.GreaterOrEqual(len(entries), 0, "Should be able to 
read manifest entries")
+
+                       // Log manifest contents for verification
+                       s.T().Logf("Manifest %s has %d entries, content type: 
%v",
+                               manifest.FilePath(), len(entries), 
manifest.ManifestContent())
+               }
+
+               s.T().Log("Successfully handled manifests with delete file 
references")
+       })
+}
+
+func (s *ManifestOperationsTestSuite) TestManifestCleanup() {
+       s.Run("RemoveUnreferencedManifests", func() {
+               // Test cleanup of unreferenced manifest files
+               ident := table.Identifier{"default", "manifest_cleanup_test"}
+
+               // Create table with multiple snapshots to generate 
unreferenced manifests
+               tbl := s.createTable(ident, 2, *iceberg.UnpartitionedSpec, 
iceberg.Properties{
+                       "format-version":              "2",
+                       table.ManifestMergeEnabledKey: "false", // Disable 
merging to create more manifests
+               })
+
+               fs := s.getFS(tbl)
+
+               // Add files in separate transactions to create multiple 
snapshots
+               var allSnapshots []int64
+               for i := 0; i < 3; i++ {
+                       filePath := 
fmt.Sprintf("%s/data/cleanup-test-%d.parquet", s.location, i)
+                       s.writeParquet(fs, filePath, s.arrTable)
+
+                       tx := tbl.NewTransaction()
+                       s.Require().NoError(tx.AddFiles(s.ctx, 
[]string{filePath}, nil, false))
+
+                       committedTbl, err := tx.Commit(s.ctx)
+                       s.Require().NoError(err)
+
+                       snapshot := committedTbl.CurrentSnapshot()
+                       s.Require().NotNil(snapshot)
+                       allSnapshots = append(allSnapshots, snapshot.SnapshotID)
+
+                       tbl = committedTbl
+               }
+
+               s.T().Logf("Created %d snapshots: %v", len(allSnapshots), 
allSnapshots)
+
+               // Get all manifests from all snapshots
+               var allManifestPaths []string
+
+               // Examine current snapshot manifests
+               currentSnapshot := tbl.CurrentSnapshot()
+               s.Require().NotNil(currentSnapshot)
+
+               currentManifests, err := currentSnapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               for _, manifest := range currentManifests {
+                       allManifestPaths = append(allManifestPaths, 
manifest.FilePath())
+               }
+
+               s.T().Logf("Current snapshot has %d manifests", 
len(currentManifests))
+
+               // Test manifest accessibility and structure
+               referencedCount := 0
+               for _, manifestPath := range allManifestPaths {
+                       // Try to read the manifest to verify it's properly 
structured
+                       file, err := fs.Open(manifestPath)
+                       if err != nil {
+                               s.T().Logf("Could not open manifest %s: %v", 
manifestPath, err)
+
+                               continue
+                       }
+                       file.Close()
+                       referencedCount++
+               }
+
+               s.T().Logf("Found %d accessible manifest files", 
referencedCount)
+               s.Greater(referencedCount, 0, "Should have accessible 
referenced manifests")
+
+               // In a real implementation, cleanup would:
+               // 1. Identify all snapshots still referenced by table metadata
+               // 2. Collect all manifests referenced by those snapshots
+               // 3. Find manifests on disk not referenced by any snapshot
+               // 4. Safely delete unreferenced manifests
+
+               // For this test, we verify the basic structure is correct
+               s.T().Log("Manifest cleanup test completed - verified manifest 
accessibility")
+               s.T().Log("Real cleanup would require catalog-level snapshot 
management")
+       })
+
+       s.Run("SafetyChecksForCleanup", func() {
+               // Test safety checks to prevent deleting referenced manifests
+               ident := table.Identifier{"default", "cleanup_safety_test"}
+               tbl, _ := s.createTableWithManyFiles(ident, 2, false)
+
+               // Get current manifests
+               snapshot := tbl.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               fs := s.getFS(tbl)
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               // Verify all manifests are properly referenced
+               for _, manifest := range manifests {
+                       // Each manifest should be readable and have valid 
structure
+                       entries, err := manifest.FetchEntries(fs, false)
+                       s.Require().NoError(err)
+
+                       // Verify manifest contains expected entries
+                       s.Greater(len(entries), 0, "Referenced manifest should 
have entries")
+
+                       // Verify manifest metadata
+                       s.NotEmpty(manifest.FilePath(), "Manifest should have 
valid path")
+                       s.Greater(manifest.Length(), int64(0), "Manifest should 
have non-zero length")
+
+                       s.T().Logf("Manifest %s: %d entries, %d bytes",
+                               manifest.FilePath(), len(entries), 
manifest.Length())
+               }
+
+               // In a real cleanup implementation, these checks would prevent 
deletion:
+               // 1. Manifest is referenced by current snapshot ✓
+               // 2. Manifest is referenced by other snapshots (check metadata 
history)
+               // 3. Manifest file is not corrupted ✓
+               // 4. Manifest contains valid entries ✓
+
+               s.T().Log("Safety checks passed - all manifests are properly 
referenced and valid")
+       })
+}
+
+func (s *ManifestOperationsTestSuite) TestManifestMetrics() {
+       s.Run("ManifestSizeAndCount", func() {

Review Comment:
   same comment as above



##########
table/manifest_operations_test.go:
##########
@@ -0,0 +1,514 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/arrow-go/v18/parquet/pqarrow"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+)
+
+type ManifestOperationsTestSuite struct {
+       suite.Suite
+       ctx         context.Context
+       location    string
+       tableSchema *iceberg.Schema
+       arrSchema   *arrow.Schema
+       arrTable    arrow.Table
+}
+
+func TestManifestOperations(t *testing.T) {
+       suite.Run(t, new(ManifestOperationsTestSuite))
+}
+
+func (s *ManifestOperationsTestSuite) SetupSuite() {
+       s.ctx = context.Background()
+       mem := memory.DefaultAllocator
+
+       // Create a test schema
+       s.tableSchema = iceberg.NewSchema(0,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int64, Required: true},
+               iceberg.NestedField{ID: 2, Name: "data", Type: 
iceberg.PrimitiveTypes.String},
+               iceberg.NestedField{ID: 3, Name: "value", Type: 
iceberg.PrimitiveTypes.Int32})
+
+       s.arrSchema = arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
+               {Name: "data", Type: arrow.BinaryTypes.String, Nullable: true},
+               {Name: "value", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       // Create test data
+       var err error
+       s.arrTable, err = array.TableFromJSON(mem, s.arrSchema, []string{
+               `[
+                       {"id": 1, "data": "foo", "value": 100},
+                       {"id": 2, "data": "bar", "value": 200},
+                       {"id": 3, "data": "baz", "value": 300}
+               ]`,
+       })
+       s.Require().NoError(err)
+}
+
+func (s *ManifestOperationsTestSuite) SetupTest() {
+       s.location = filepath.ToSlash(strings.Replace(s.T().TempDir(), "#", "", 
-1))
+}
+
+func (s *ManifestOperationsTestSuite) TearDownSuite() {
+       s.arrTable.Release()
+}
+
+func (s *ManifestOperationsTestSuite) getMetadataLoc() string {
+       return fmt.Sprintf("%s/metadata/%05d-%s.metadata.json",
+               s.location, 1, uuid.New().String())
+}
+
+func (s *ManifestOperationsTestSuite) writeParquet(fio iceio.WriteFileIO, 
filePath string, arrTbl arrow.Table) {
+       fo, err := fio.Create(filePath)
+       s.Require().NoError(err)
+
+       s.Require().NoError(pqarrow.WriteTable(arrTbl, fo, arrTbl.NumRows(),
+               nil, pqarrow.DefaultWriterProps()))
+}
+
+func (s *ManifestOperationsTestSuite) createTable(identifier table.Identifier, 
formatVersion int, spec iceberg.PartitionSpec, props iceberg.Properties) 
*table.Table {
+       meta, err := table.NewMetadata(s.tableSchema, &spec, 
table.UnsortedSortOrder,
+               s.location, props)
+       s.Require().NoError(err)
+
+       return table.New(
+               identifier,
+               meta,
+               s.getMetadataLoc(),
+               func(ctx context.Context) (iceio.IO, error) {
+                       return iceio.LocalFS{}, nil
+               },
+               &mockedCatalog{},
+       )
+}
+
+func (s *ManifestOperationsTestSuite) getFS(tbl *table.Table) 
iceio.WriteFileIO {
+       fs, err := tbl.FS(s.ctx)
+       s.Require().NoError(err)
+
+       return fs.(iceio.WriteFileIO)
+}
+
+func (s *ManifestOperationsTestSuite) createTableWithManyFiles(identifier 
table.Identifier, numFiles int, enableMerge bool) (*table.Table, []string) {
+       // Configure table properties for manifest merging
+       props := iceberg.Properties{"format-version": "2"}
+       if enableMerge {
+               // Enable manifest merging with aggressive settings for testing
+               props[table.ManifestMergeEnabledKey] = "true"
+               props[table.ManifestMinMergeCountKey] = "2"      // Merge when 
we have 2+ manifests
+               props[table.ManifestTargetSizeBytesKey] = "4096" // Small 
target size for testing
+       } else {
+               props[table.ManifestMergeEnabledKey] = "false"
+       }
+
+       tbl := s.createTable(identifier, 2, *iceberg.UnpartitionedSpec, props)
+
+       files := make([]string, 0, numFiles)
+       fs := s.getFS(tbl)
+
+       // Create and add files one by one to generate multiple manifest files
+       for i := 0; i < numFiles; i++ {
+               filePath := fmt.Sprintf("%s/data/test-%d.parquet", s.location, 
i)
+               s.writeParquet(fs, filePath, s.arrTable)
+               files = append(files, filePath)
+
+               // Add each file in a separate transaction to create multiple 
manifests
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{filePath}, nil, 
false))
+
+               committedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+               tbl = committedTbl
+       }
+
+       return tbl, files
+}
+
+func TestRewriteManifests(t *testing.T) {
+       suite.Run(t, &ManifestOperationsTestSuite{})
+}
+
+func (s *ManifestOperationsTestSuite) TestRewriteManifests() {
+       s.Run("ConsolidateManifests", func() {
+               // Test consolidating multiple small manifests
+               ident := table.Identifier{"default", 
"consolidate_manifests_test"}
+
+               // Create table with manifest merging disabled first to create 
multiple manifests
+               tblWithoutMerge, _ := s.createTableWithManyFiles(ident, 5, 
false)
+
+               // Get current snapshot to examine manifests
+               snapshot := tblWithoutMerge.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               fs := s.getFS(tblWithoutMerge)
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               initialManifestCount := len(manifests)
+               s.T().Logf("Initial manifest count: %d", initialManifestCount)
+
+               // Verify we have multiple manifests (one per file add 
operation)
+               s.GreaterOrEqual(initialManifestCount, 5, "Should have multiple 
manifests from separate transactions")
+
+               // Now enable manifest merging and perform an operation to 
trigger consolidation
+               ident2 := table.Identifier{"default", 
"consolidate_manifests_merged"}
+               tblWithMerge := s.createTable(ident2, 2, 
*iceberg.UnpartitionedSpec, iceberg.Properties{
+                       "format-version":                 "2",
+                       table.ManifestMergeEnabledKey:    "true",
+                       table.ManifestMinMergeCountKey:   "2",    // Merge when 
2+ manifests
+                       table.ManifestTargetSizeBytesKey: "8192", // Small 
target for testing
+               })
+
+               // Add multiple files in one transaction with merging enabled
+               files := make([]string, 0, 5)
+               for i := 0; i < 5; i++ {
+                       filePath := fmt.Sprintf("%s/data/merged-%d.parquet", 
s.location, i)
+                       s.writeParquet(s.getFS(tblWithMerge), filePath, 
s.arrTable)
+                       files = append(files, filePath)
+               }
+
+               tx := tblWithMerge.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, files, nil, false))
+
+               mergedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Check manifests after merging
+               mergedSnapshot := mergedTbl.CurrentSnapshot()
+               s.Require().NotNil(mergedSnapshot)
+
+               mergedManifests, err := 
mergedSnapshot.Manifests(s.getFS(mergedTbl))
+               s.Require().NoError(err)
+
+               mergedManifestCount := len(mergedManifests)
+               s.T().Logf("Merged manifest count: %d", mergedManifestCount)
+
+               // Verify manifest count reduction (should have fewer manifests 
due to merging)
+               s.LessOrEqual(mergedManifestCount, 3, "Manifest merging should 
reduce manifest count")
+
+               // Verify data integrity
+               scan := mergedTbl.Scan()
+               results, err := scan.ToArrowTable(s.ctx)
+               s.Require().NoError(err)
+               defer results.Release()
+
+               s.Equal(int64(15), results.NumRows(), "Should have all data 
after manifest consolidation")
+
+               s.T().Log("Successfully consolidated manifests and verified 
data integrity")
+       })
+
+       s.Run("RewriteManifestsWithDeletes", func() {
+               // Test rewriting manifests that contain delete files
+               ident := table.Identifier{"default", 
"rewrite_with_deletes_test"}
+               tbl, originalFiles := s.createTableWithManyFiles(ident, 3, 
false)
+
+               // Create position delete files
+               mem := memory.DefaultAllocator
+               posDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "file_path", Type: arrow.BinaryTypes.String, 
Nullable: false},
+                       {Name: "pos", Type: arrow.PrimitiveTypes.Int32, 
Nullable: false},
+               }, nil)
+
+               // Create delete data for first file, deleting position 1
+               deleteData, err := array.TableFromJSON(mem, posDeleteSchema, 
[]string{
+                       fmt.Sprintf(`[{"file_path": "%s", "pos": 1}]`, 
originalFiles[0]),
+               })
+               s.Require().NoError(err)
+               defer deleteData.Release()
+
+               // Write position delete file
+               deleteFilePath := s.location + "/deletes/pos_deletes.parquet"
+               fs := s.getFS(tbl)
+               s.writeParquet(fs, deleteFilePath, deleteData)
+
+               // Create equality delete file
+               eqDeleteSchema := arrow.NewSchema([]arrow.Field{
+                       {Name: "id", Type: arrow.PrimitiveTypes.Int64, 
Nullable: false},
+               }, nil)
+
+               eqDeleteData, err := array.TableFromJSON(mem, eqDeleteSchema, 
[]string{
+                       `[{"id": 2}]`, // Delete row with id=2
+               })
+               s.Require().NoError(err)
+               defer eqDeleteData.Release()
+
+               eqDeleteFilePath := s.location + "/deletes/eq_deletes.parquet"
+               s.writeParquet(fs, eqDeleteFilePath, eqDeleteData)
+
+               // Now perform an operation that would trigger manifest 
rewriting
+               // Add another file to trigger potential manifest consolidation
+               newFilePath := s.location + "/data/additional.parquet"
+               s.writeParquet(fs, newFilePath, s.arrTable)
+
+               tx := tbl.NewTransaction()
+               s.Require().NoError(tx.AddFiles(s.ctx, []string{newFilePath}, 
nil, false))
+
+               updatedTbl, err := tx.Commit(s.ctx)
+               s.Require().NoError(err)
+
+               // Verify proper handling of delete file references
+               snapshot := updatedTbl.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               s.Greater(len(manifests), 0, "Should have manifest files")
+
+               // Verify that manifests can be read properly (structure 
validation)
+               for _, manifest := range manifests {
+                       entries, err := manifest.FetchEntries(fs, false)
+                       s.Require().NoError(err)
+                       s.GreaterOrEqual(len(entries), 0, "Should be able to 
read manifest entries")
+
+                       // Log manifest contents for verification
+                       s.T().Logf("Manifest %s has %d entries, content type: 
%v",
+                               manifest.FilePath(), len(entries), 
manifest.ManifestContent())
+               }
+
+               s.T().Log("Successfully handled manifests with delete file 
references")
+       })
+}
+
+func (s *ManifestOperationsTestSuite) TestManifestCleanup() {
+       s.Run("RemoveUnreferencedManifests", func() {
+               // Test cleanup of unreferenced manifest files
+               ident := table.Identifier{"default", "manifest_cleanup_test"}
+
+               // Create table with multiple snapshots to generate 
unreferenced manifests
+               tbl := s.createTable(ident, 2, *iceberg.UnpartitionedSpec, 
iceberg.Properties{
+                       "format-version":              "2",
+                       table.ManifestMergeEnabledKey: "false", // Disable 
merging to create more manifests
+               })
+
+               fs := s.getFS(tbl)
+
+               // Add files in separate transactions to create multiple 
snapshots
+               var allSnapshots []int64
+               for i := 0; i < 3; i++ {
+                       filePath := 
fmt.Sprintf("%s/data/cleanup-test-%d.parquet", s.location, i)
+                       s.writeParquet(fs, filePath, s.arrTable)
+
+                       tx := tbl.NewTransaction()
+                       s.Require().NoError(tx.AddFiles(s.ctx, 
[]string{filePath}, nil, false))
+
+                       committedTbl, err := tx.Commit(s.ctx)
+                       s.Require().NoError(err)
+
+                       snapshot := committedTbl.CurrentSnapshot()
+                       s.Require().NotNil(snapshot)
+                       allSnapshots = append(allSnapshots, snapshot.SnapshotID)
+
+                       tbl = committedTbl
+               }
+
+               s.T().Logf("Created %d snapshots: %v", len(allSnapshots), 
allSnapshots)
+
+               // Get all manifests from all snapshots
+               var allManifestPaths []string
+
+               // Examine current snapshot manifests
+               currentSnapshot := tbl.CurrentSnapshot()
+               s.Require().NotNil(currentSnapshot)
+
+               currentManifests, err := currentSnapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               for _, manifest := range currentManifests {
+                       allManifestPaths = append(allManifestPaths, 
manifest.FilePath())
+               }
+
+               s.T().Logf("Current snapshot has %d manifests", 
len(currentManifests))
+
+               // Test manifest accessibility and structure
+               referencedCount := 0
+               for _, manifestPath := range allManifestPaths {
+                       // Try to read the manifest to verify it's properly 
structured
+                       file, err := fs.Open(manifestPath)
+                       if err != nil {
+                               s.T().Logf("Could not open manifest %s: %v", 
manifestPath, err)
+
+                               continue
+                       }
+                       file.Close()
+                       referencedCount++
+               }
+
+               s.T().Logf("Found %d accessible manifest files", 
referencedCount)
+               s.Greater(referencedCount, 0, "Should have accessible 
referenced manifests")
+
+               // In a real implementation, cleanup would:
+               // 1. Identify all snapshots still referenced by table metadata
+               // 2. Collect all manifests referenced by those snapshots
+               // 3. Find manifests on disk not referenced by any snapshot
+               // 4. Safely delete unreferenced manifests
+
+               // For this test, we verify the basic structure is correct
+               s.T().Log("Manifest cleanup test completed - verified manifest 
accessibility")
+               s.T().Log("Real cleanup would require catalog-level snapshot 
management")
+       })
+
+       s.Run("SafetyChecksForCleanup", func() {
+               // Test safety checks to prevent deleting referenced manifests
+               ident := table.Identifier{"default", "cleanup_safety_test"}
+               tbl, _ := s.createTableWithManyFiles(ident, 2, false)
+
+               // Get current manifests
+               snapshot := tbl.CurrentSnapshot()
+               s.Require().NotNil(snapshot)
+
+               fs := s.getFS(tbl)
+               manifests, err := snapshot.Manifests(fs)
+               s.Require().NoError(err)
+
+               // Verify all manifests are properly referenced
+               for _, manifest := range manifests {
+                       // Each manifest should be readable and have valid 
structure
+                       entries, err := manifest.FetchEntries(fs, false)
+                       s.Require().NoError(err)
+
+                       // Verify manifest contains expected entries
+                       s.Greater(len(entries), 0, "Referenced manifest should 
have entries")
+
+                       // Verify manifest metadata
+                       s.NotEmpty(manifest.FilePath(), "Manifest should have 
valid path")
+                       s.Greater(manifest.Length(), int64(0), "Manifest should 
have non-zero length")
+
+                       s.T().Logf("Manifest %s: %d entries, %d bytes",
+                               manifest.FilePath(), len(entries), 
manifest.Length())
+               }
+
+               // In a real cleanup implementation, these checks would prevent 
deletion:
+               // 1. Manifest is referenced by current snapshot ✓
+               // 2. Manifest is referenced by other snapshots (check metadata 
history)
+               // 3. Manifest file is not corrupted ✓
+               // 4. Manifest contains valid entries ✓
+
+               s.T().Log("Safety checks passed - all manifests are properly 
referenced and valid")
+       })
+}
+
+func (s *ManifestOperationsTestSuite) TestManifestMetrics() {
+       s.Run("ManifestSizeAndCount", func() {
+               // Test manifest size and count metrics
+
+               // Test with different manifest merge settings
+               testCases := []struct {
+                       name           string
+                       mergeEnabled   bool
+                       minCount       string
+                       targetSize     string
+                       expectedResult string
+               }{
+                       {
+                               name:           "No Merging",
+                               mergeEnabled:   false,
+                               minCount:       "100",
+                               targetSize:     "8388608", // 8MB
+                               expectedResult: "Multiple small manifests",
+                       },
+                       {
+                               name:           "Aggressive Merging",
+                               mergeEnabled:   true,
+                               minCount:       "1",
+                               targetSize:     "1024", // 1KB - very small for 
testing
+                               expectedResult: "Fewer, larger manifests",
+                       },
+                       {
+                               name:           "Conservative Merging",
+                               mergeEnabled:   true,
+                               minCount:       "10",
+                               targetSize:     "8388608", // 8MB
+                               expectedResult: "Limited merging",
+                       },
+               }
+
+               for i, tc := range testCases {
+                       s.Run(tc.name, func() {

Review Comment:
   this is the proper time you would use `s.Run` :smile: for subtests that 
share code between them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to