This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b2ea9b0 chore: pass writerFactory to newPartitionedFanoutWrite (#743)
7b2ea9b0 is described below

commit 7b2ea9b0393dccccefeffed93095fc44723066f6
Author: Tobias Pütz <[email protected]>
AuthorDate: Fri Feb 20 18:52:18 2026 +0100

    chore: pass writerFactory to newPartitionedFanoutWrite (#743)
    
    Just a small cleanup to improve ergonomics, the only two call sites of
    newPartitionedFanoutWriter were setting the writers field immediately
    after instantiation, this way it's harder to write incorrect code which
    forgets to set them.
---
 table/arrow_utils.go                    |  3 +--
 table/metadata.go                       |  6 +++---
 table/partitioned_fanout_writer.go      | 11 ++++++-----
 table/partitioned_fanout_writer_test.go |  4 +---
 table/rolling_data_writer.go            |  2 +-
 table/snapshot_producers_test.go        | 12 ++++++------
 6 files changed, 18 insertions(+), 20 deletions(-)

diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index 35d3b538..e599735a 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -1360,9 +1360,8 @@ func recordsToDataFiles(ctx context.Context, rootLocation 
string, meta *Metadata
 
                return writeFiles(ctx, rootLocation, args.fs, meta, nil, tasks)
        } else {
-               partitionWriter := newPartitionedFanoutWriter(*currentSpec, 
meta.CurrentSchema(), args.itr)
                rollingDataWriters := NewWriterFactory(rootLocation, args, 
meta, taskSchema, targetFileSize)
-               partitionWriter.writers = &rollingDataWriters
+               partitionWriter := newPartitionedFanoutWriter(*currentSpec, 
meta.CurrentSchema(), args.itr, &rollingDataWriters)
                workers := config.EnvConfig.MaxWorkers
 
                return partitionWriter.Write(ctx, workers)
diff --git a/table/metadata.go b/table/metadata.go
index 4df78e32..2dcdf6c1 100644
--- a/table/metadata.go
+++ b/table/metadata.go
@@ -72,7 +72,7 @@ type Metadata interface {
        // table is created. Implementations must throw an exception if a 
table's
        // UUID does not match the expected UUID after refreshing metadata.
        TableUUID() uuid.UUID
-       // Location is the table's base location. This is used by writers to 
determine
+       // Location is the table's base location. This is used by writerFactory 
to determine
        // where to store data files, manifest files, and table metadata files.
        Location() string
        // LastUpdatedMillis is the timestamp in milliseconds from the unix 
epoch when
@@ -95,7 +95,7 @@ type Metadata interface {
        // PartitionSpecByID returns the partition spec with the given ID. 
Returns
        // nil if the ID is not found in the list of partition specs.
        PartitionSpecByID(int) *iceberg.PartitionSpec
-       // DefaultPartitionSpec is the ID of the current spec that writers 
should
+       // DefaultPartitionSpec is the ID of the current spec that 
writerFactory should
        // use by default.
        DefaultPartitionSpec() int
        // LastPartitionSpecID is the highest assigned partition field ID across
@@ -126,7 +126,7 @@ type Metadata interface {
        SortOrder() SortOrder
        // SortOrders returns the list of sort orders in the table.
        SortOrders() []SortOrder
-       // DefaultSortOrder returns the ID of the current sort order that 
writers
+       // DefaultSortOrder returns the ID of the current sort order that 
writerFactory
        // should use by default.
        DefaultSortOrder() int
        // Properties is a string to string map of table properties. This is 
used
diff --git a/table/partitioned_fanout_writer.go 
b/table/partitioned_fanout_writer.go
index 838513f5..624c5ad5 100644
--- a/table/partitioned_fanout_writer.go
+++ b/table/partitioned_fanout_writer.go
@@ -39,7 +39,7 @@ type partitionedFanoutWriter struct {
        partitionSpec iceberg.PartitionSpec
        schema        *iceberg.Schema
        itr           iter.Seq2[arrow.RecordBatch, error]
-       writers       *writerFactory
+       writerFactory *writerFactory
 }
 
 // PartitionInfo holds the row indices and partition values for a specific 
partition,
@@ -51,12 +51,13 @@ type partitionInfo struct {
 }
 
 // NewPartitionedFanoutWriter creates a new PartitionedFanoutWriter with the 
specified
-// partition specification, schema, and record iterator.
-func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema 
*iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error]) 
*partitionedFanoutWriter {
+// partition specification, schema, record iterator, and writerFactory.
+func newPartitionedFanoutWriter(partitionSpec iceberg.PartitionSpec, schema 
*iceberg.Schema, itr iter.Seq2[arrow.RecordBatch, error], writerFactory 
*writerFactory) *partitionedFanoutWriter {
        return &partitionedFanoutWriter{
                partitionSpec: partitionSpec,
                schema:        schema,
                itr:           itr,
+               writerFactory: writerFactory,
        }
 }
 
@@ -136,7 +137,7 @@ func (p *partitionedFanoutWriter) fanout(ctx 
context.Context, inputRecordsCh <-c
                                }
 
                                partitionPath := 
p.partitionPath(val.partitionRec)
-                               rollingDataWriter, err := 
p.writers.getOrCreateRollingDataWriter(ctx, partitionPath, val.partitionValues, 
dataFilesChannel)
+                               rollingDataWriter, err := 
p.writerFactory.getOrCreateRollingDataWriter(ctx, partitionPath, 
val.partitionValues, dataFilesChannel)
                                if err != nil {
                                        return err
                                }
@@ -157,7 +158,7 @@ func (p *partitionedFanoutWriter) 
yieldDataFiles(fanoutWorkers *errgroup.Group,
        go func() {
                defer close(outputDataFilesCh)
                err := fanoutWorkers.Wait()
-               err = errors.Join(err, p.writers.closeAll())
+               err = errors.Join(err, p.writerFactory.closeAll())
                errCh <- err
                close(errCh)
        }()
diff --git a/table/partitioned_fanout_writer_test.go 
b/table/partitioned_fanout_writer_test.go
index b7e0f6d8..734e31c7 100644
--- a/table/partitioned_fanout_writer_test.go
+++ b/table/partitioned_fanout_writer_test.go
@@ -134,10 +134,8 @@ func (s *FanoutWriterTestSuite) 
testTransformPartition(transform iceberg.Transfo
        taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
        s.Require().NoError(err)
 
-       partitionWriter := newPartitionedFanoutWriter(spec, taskSchema, 
args.itr)
        rollingDataWriters := NewWriterFactory(loc, args, metaBuilder, 
icebergSchema, 1024*1024)
-
-       partitionWriter.writers = &rollingDataWriters
+       partitionWriter := newPartitionedFanoutWriter(spec, taskSchema, 
args.itr, &rollingDataWriters)
        workers := config.EnvConfig.MaxWorkers
 
        dataFiles := partitionWriter.Write(s.ctx, workers)
diff --git a/table/rolling_data_writer.go b/table/rolling_data_writer.go
index 020effdd..35d42ebc 100644
--- a/table/rolling_data_writer.go
+++ b/table/rolling_data_writer.go
@@ -46,7 +46,7 @@ type writerFactory struct {
 }
 
 // NewWriterFactory creates a new WriterFactory with the specified 
configuration
-// for managing rolling data writers across partitions.
+// for managing rolling data writerFactory across partitions.
 func NewWriterFactory(rootLocation string, args recordWritingArgs, meta 
*MetadataBuilder, taskSchema *iceberg.Schema, targetFileSize int64) 
writerFactory {
        nextCount, stopCount := iter.Pull(args.counter)
 
diff --git a/table/snapshot_producers_test.go b/table/snapshot_producers_test.go
index 025e0b59..9c008886 100644
--- a/table/snapshot_producers_test.go
+++ b/table/snapshot_producers_test.go
@@ -606,7 +606,7 @@ func TestManifestWriterClosesUnderlyingFile(t *testing.T) {
        require.Len(t, manifests, 1, "should have one manifest")
 
        unclosed := trackIO.GetUnclosedWriters()
-       require.Empty(t, unclosed, "all file writers should be closed, but 
these are still open: %v", unclosed)
+       require.Empty(t, unclosed, "all file writerFactory should be closed, 
but these are still open: %v", unclosed)
 }
 
 // TestCreateManifestClosesUnderlyingFile tests that createManifest properly
@@ -636,7 +636,7 @@ func TestCreateManifestClosesUnderlyingFile(t *testing.T) {
        require.NoError(t, err, "createManifest should succeed")
 
        unclosed := trackIO.GetUnclosedWriters()
-       require.Empty(t, unclosed, "all file writers should be closed after 
createManifest, but these are still open: %v", unclosed)
+       require.Empty(t, unclosed, "all file writerFactory should be closed 
after createManifest, but these are still open: %v", unclosed)
 }
 
 // TestOverwriteExistingManifestsClosesUnderlyingFile tests that 
existingManifests
@@ -688,11 +688,11 @@ func TestOverwriteExistingManifestsClosesUnderlyingFile(t 
*testing.T) {
        require.NoError(t, err, "existingManifests should succeed")
 
        unclosed := trackIO.GetUnclosedWriters()
-       require.Empty(t, unclosed, "all file writers should be closed after 
existingManifests, but these are still open: %v", unclosed)
+       require.Empty(t, unclosed, "all file writerFactory should be closed 
after existingManifests, but these are still open: %v", unclosed)
 }
 
 // errorOnDeletedEntries is a producerImpl that returns an error from 
deletedEntries()
-// to test that file writers are properly closed even when deletedEntries 
fails.
+// to test that file writerFactory are properly closed even when 
deletedEntries fails.
 type errorOnDeletedEntries struct {
        base                *snapshotProducer
        err                 error
@@ -743,7 +743,7 @@ func (b *blockingTrackingIO) Create(name string) 
(iceio.FileWriter, error) {
        return writer, err
 }
 
-// This test verifies that NO writers are created when deletedEntries() fails,
+// This test verifies that NO writerFactory are created when deletedEntries() 
fails,
 // because the error should be returned before any goroutines start.
 func TestManifestsClosesWriterWhenDeletedEntriesFails(t *testing.T) {
        ctx, cancel := context.WithCancel(context.Background())
@@ -781,6 +781,6 @@ func TestManifestsClosesWriterWhenDeletedEntriesFails(t 
*testing.T) {
 
        case <-time.After(100 * time.Millisecond):
                writerCount := blockingIO.GetWriterCount()
-               require.Zero(t, writerCount, "expected no writers to be created 
when deletedEntries is called first")
+               require.Zero(t, writerCount, "expected no writerFactory to be 
created when deletedEntries is called first")
        }
 }

Reply via email to