This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 13cf142f feat(table): public write DataFile API (#776)
13cf142f is described below

commit 13cf142fe1b1067548a21dde56b0d5da5f220992
Author: Tobias Pütz <[email protected]>
AuthorDate: Thu Mar 19 17:17:38 2026 +0100

    feat(table): public write DataFile API (#776)
    
    This exposes a `WriteRecords` function which enables users to write
    parquet files outside of a append transaction, this can be useful in
    conjunction with `ReplaceDataFilesWithDataFiles` or `AddDataFiles` for
    something like compaction or other rewrite operations.
---
 internal/utils.go           |   8 ++
 table/write_records.go      | 129 ++++++++++++++++++++++++
 table/write_records_test.go | 237 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 374 insertions(+)

diff --git a/internal/utils.go b/internal/utils.go
index 9ed69ea4..f7b06f7b 100644
--- a/internal/utils.go
+++ b/internal/utils.go
@@ -164,6 +164,14 @@ func RecoverError(err *error) {
        }
 }
 
+func SingleErrorIter[T any](err error) iter.Seq2[T, error] {
+       var z T
+
+       return func(yield func(T, error) bool) {
+               _ = yield(z, err)
+       }
+}
+
 func Counter(start int) iter.Seq[int] {
        var current atomic.Int64
        current.Store(int64(start) - 1)
diff --git a/table/write_records.go b/table/write_records.go
new file mode 100644
index 00000000..f937e999
--- /dev/null
+++ b/table/write_records.go
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+       "context"
+       "fmt"
+       "iter"
+       "strconv"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/iceberg-go"
+       "github.com/apache/iceberg-go/internal"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/google/uuid"
+)
+
+// WriteRecordOption configures the behavior of WriteRecords.
+type WriteRecordOption func(*writeRecordConfig)
+
+type writeRecordConfig struct {
+       targetFileSize int64
+       writeUUID      *uuid.UUID
+}
+
+// WithTargetFileSize overrides the table's default target file size.
+func WithTargetFileSize(size int64) WriteRecordOption {
+       return func(c *writeRecordConfig) {
+               c.targetFileSize = size
+       }
+}
+
+// WithWriteUUID sets a specific UUID for file naming.
+func WithWriteUUID(id uuid.UUID) WriteRecordOption {
+       return func(c *writeRecordConfig) {
+               c.writeUUID = &id
+       }
+}
+
+// WriteRecords writes Arrow record batches to Parquet data files for the given
+// table, returning an iterator of the resulting DataFile objects.
+//
+// The provided Arrow schema must be compatible with the table's current 
Iceberg
+// schema: each field in the Arrow schema is matched to the table schema by
+// field ID (or by name via the table's name mapping if field IDs are absent).
+// The Arrow schema may be a subset of the table schema (projection), but every
+// field present must have a type that is promotable to the corresponding table
+// field type.
+//
+// WriteRecords releases each RecordBatch it consumes. If the caller needs a
+// batch to remain valid after it has been yielded, it must call Retain before
+// yielding and is then responsible for the corresponding Release.
+func WriteRecords(ctx context.Context, tbl *Table,
+       schema *arrow.Schema,
+       records iter.Seq2[arrow.RecordBatch, error],
+       opts ...WriteRecordOption,
+) iter.Seq2[iceberg.DataFile, error] {
+       if err := checkArrowSchemaCompat(tbl.Schema(), schema, false); err != 
nil {
+               return internal.SingleErrorIter[iceberg.DataFile](
+                       fmt.Errorf("arrow schema is not compatible with the 
table schema: %w", err))
+       }
+
+       cfg := writeRecordConfig{}
+       for _, opt := range opts {
+               opt(&cfg)
+       }
+
+       fs, err := tbl.fsF(ctx)
+       if err != nil {
+               return internal.SingleErrorIter[iceberg.DataFile](err)
+       }
+
+       writeFS, ok := fs.(iceio.WriteFileIO)
+       if !ok {
+               return 
internal.SingleErrorIter[iceberg.DataFile](fmt.Errorf("%w: filesystem does not 
support writing", iceberg.ErrNotImplemented))
+       }
+
+       meta, err := MetadataBuilderFromBase(tbl.metadata, tbl.metadataLocation)
+       if err != nil {
+               return 
internal.SingleErrorIter[iceberg.DataFile](fmt.Errorf("failed to build 
metadata: %w", err))
+       }
+
+       if cfg.targetFileSize > 0 {
+               if meta.props == nil {
+                       meta.props = make(iceberg.Properties)
+               }
+               meta.props[WriteTargetFileSizeBytesKey] = 
strconv.FormatInt(cfg.targetFileSize, 10)
+       }
+
+       releasing := func(yield func(arrow.RecordBatch, error) bool) {
+               for rec, err := range records {
+                       if err != nil {
+                               yield(nil, err)
+
+                               return
+                       }
+                       if !yield(rec, nil) {
+                               rec.Release()
+
+                               return
+                       }
+                       rec.Release()
+               }
+       }
+
+       args := recordWritingArgs{
+               sc:        schema,
+               itr:       releasing,
+               fs:        writeFS,
+               writeUUID: cfg.writeUUID,
+       }
+
+       return recordsToDataFiles(ctx, tbl.Location(), meta, args)
+}
diff --git a/table/write_records_test.go b/table/write_records_test.go
new file mode 100644
index 00000000..710c5768
--- /dev/null
+++ b/table/write_records_test.go
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+       "context"
+       "errors"
+       "path/filepath"
+       "testing"
+
+       "github.com/apache/arrow-go/v18/arrow"
+       "github.com/apache/arrow-go/v18/arrow/array"
+       "github.com/apache/arrow-go/v18/arrow/compute"
+       "github.com/apache/arrow-go/v18/arrow/memory"
+       "github.com/apache/iceberg-go"
+       iceio "github.com/apache/iceberg-go/io"
+       "github.com/apache/iceberg-go/table"
+       "github.com/google/uuid"
+       "github.com/stretchr/testify/suite"
+)
+
+type WriteRecordsTestSuite struct {
+       suite.Suite
+
+       mem *memory.CheckedAllocator
+       ctx context.Context
+}
+
+func TestWriteRecords(t *testing.T) {
+       suite.Run(t, new(WriteRecordsTestSuite))
+}
+
+func (s *WriteRecordsTestSuite) SetupTest() {
+       s.mem = memory.NewCheckedAllocator(memory.DefaultAllocator)
+       s.ctx = compute.WithAllocator(context.Background(), s.mem)
+}
+
+func (s *WriteRecordsTestSuite) TearDownTest() {
+       s.mem.AssertSize(s.T(), 0)
+}
+
+func (s *WriteRecordsTestSuite) newTable(loc string) *table.Table {
+       iceSch := iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: false},
+               iceberg.NestedField{ID: 2, Name: "name", Type: 
iceberg.PrimitiveTypes.String, Required: false},
+       )
+       spec := iceberg.NewPartitionSpec()
+       meta, err := table.NewMetadata(iceSch, &spec, table.UnsortedSortOrder, 
loc, iceberg.Properties{})
+       s.Require().NoError(err)
+
+       return table.New(
+               table.Identifier{"test", "write_records"},
+               meta,
+               filepath.Join(loc, "metadata", "v1.metadata.json"),
+               func(ctx context.Context) (iceio.IO, error) { return 
iceio.LocalFS{}, nil },
+               nil,
+       )
+}
+
+func (s *WriteRecordsTestSuite) arrowSchema() *arrow.Schema {
+       return arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+               {Name: "name", Type: arrow.BinaryTypes.String, Nullable: true},
+       }, nil)
+}
+
+func (s *WriteRecordsTestSuite) buildRecords(schema *arrow.Schema, numRows 
int) arrow.RecordBatch {
+       bldr := array.NewRecordBuilder(s.mem, schema)
+       defer bldr.Release()
+
+       for i := range numRows {
+               bldr.Field(0).(*array.Int32Builder).Append(int32(i))
+               bldr.Field(1).(*array.StringBuilder).Append("value")
+       }
+
+       return bldr.NewRecordBatch()
+}
+
+func (s *WriteRecordsTestSuite) TestBasicWrite() {
+       loc := filepath.ToSlash(s.T().TempDir())
+       tbl := s.newTable(loc)
+       schema := s.arrowSchema()
+
+       records := func(yield func(arrow.RecordBatch, error) bool) {
+               rec := s.buildRecords(schema, 10)
+               yield(rec, nil)
+       }
+
+       var dataFiles []iceberg.DataFile
+       for df, err := range table.WriteRecords(s.ctx, tbl, schema, records) {
+               s.Require().NoError(err)
+               dataFiles = append(dataFiles, df)
+       }
+
+       s.Require().Len(dataFiles, 1)
+       s.Equal(int64(10), dataFiles[0].Count())
+       s.Equal(iceberg.ParquetFile, dataFiles[0].FileFormat())
+       s.Greater(dataFiles[0].FileSizeBytes(), int64(0))
+       s.Contains(dataFiles[0].FilePath(), loc)
+}
+
+func (s *WriteRecordsTestSuite) TestSmallTargetFileSizeProducesMultipleFiles() 
{
+       loc := filepath.ToSlash(s.T().TempDir())
+       tbl := s.newTable(loc)
+       schema := s.arrowSchema()
+
+       records := func(yield func(arrow.RecordBatch, error) bool) {
+               for range 10 {
+                       rec := s.buildRecords(schema, 100)
+                       if !yield(rec, nil) {
+                               return
+                       }
+               }
+       }
+
+       var totalRows int64
+       var dataFiles []iceberg.DataFile
+       for df, err := range table.WriteRecords(s.ctx, tbl, schema, records, 
table.WithTargetFileSize(1)) {
+               s.Require().NoError(err)
+               dataFiles = append(dataFiles, df)
+               totalRows += df.Count()
+       }
+
+       s.Greater(len(dataFiles), 1)
+       s.Equal(int64(1000), totalRows)
+}
+
+func (s *WriteRecordsTestSuite) TestWithWriteUUID() {
+       loc := filepath.ToSlash(s.T().TempDir())
+       tbl := s.newTable(loc)
+       schema := s.arrowSchema()
+
+       writeID := uuid.New()
+       records := func(yield func(arrow.RecordBatch, error) bool) {
+               rec := s.buildRecords(schema, 5)
+               yield(rec, nil)
+       }
+
+       for df, err := range table.WriteRecords(s.ctx, tbl, schema, records, 
table.WithWriteUUID(writeID)) {
+               s.Require().NoError(err)
+               s.Contains(df.FilePath(), writeID.String())
+       }
+}
+
+func (s *WriteRecordsTestSuite) TestFSNotWritable() {
+       iceSch := iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: false},
+       )
+       spec := iceberg.NewPartitionSpec()
+       meta, err := table.NewMetadata(iceSch, &spec, table.UnsortedSortOrder, 
"/tmp/noop", iceberg.Properties{})
+       s.Require().NoError(err)
+
+       tbl := table.New(
+               table.Identifier{"test", "readonly"},
+               meta,
+               "/tmp/noop/metadata/v1.metadata.json",
+               func(ctx context.Context) (iceio.IO, error) { return 
readOnlyFS{}, nil },
+               nil,
+       )
+
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+       }, nil)
+       records := func(yield func(arrow.RecordBatch, error) bool) {}
+
+       for _, err := range table.WriteRecords(s.ctx, tbl, schema, records) {
+               s.Require().ErrorIs(err, iceberg.ErrNotImplemented)
+       }
+}
+
+func (s *WriteRecordsTestSuite) TestFSError() {
+       iceSch := iceberg.NewSchema(1,
+               iceberg.NestedField{ID: 1, Name: "id", Type: 
iceberg.PrimitiveTypes.Int32, Required: false},
+       )
+       spec := iceberg.NewPartitionSpec()
+       meta, err := table.NewMetadata(iceSch, &spec, table.UnsortedSortOrder, 
"/tmp/noop", iceberg.Properties{})
+       s.Require().NoError(err)
+
+       fsErr := errors.New("connection refused")
+       tbl := table.New(
+               table.Identifier{"test", "fs_error"},
+               meta,
+               "/tmp/noop/metadata/v1.metadata.json",
+               func(ctx context.Context) (iceio.IO, error) { return nil, fsErr 
},
+               nil,
+       )
+
+       schema := arrow.NewSchema([]arrow.Field{
+               {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+       }, nil)
+       records := func(yield func(arrow.RecordBatch, error) bool) {}
+
+       for _, err := range table.WriteRecords(s.ctx, tbl, schema, records) {
+               s.Require().ErrorIs(err, fsErr)
+       }
+}
+
+func (s *WriteRecordsTestSuite) TestEmptyInput() {
+       loc := filepath.ToSlash(s.T().TempDir())
+       tbl := s.newTable(loc)
+       schema := s.arrowSchema()
+
+       records := func(yield func(arrow.RecordBatch, error) bool) {}
+
+       count := 0
+       for _, err := range table.WriteRecords(s.ctx, tbl, schema, records) {
+               s.Require().NoError(err)
+               count++
+       }
+
+       s.Equal(0, count)
+}
+
+type readOnlyFS struct{}
+
+func (readOnlyFS) Open(name string) (iceio.File, error) {
+       return nil, errors.New("not supported")
+}
+
+func (readOnlyFS) Remove(name string) error {
+       return errors.New("not supported")
+}

Reply via email to