This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 13cf142f feat(table): public write DataFile API (#776)
13cf142f is described below
commit 13cf142fe1b1067548a21dde56b0d5da5f220992
Author: Tobias Pütz <[email protected]>
AuthorDate: Thu Mar 19 17:17:38 2026 +0100
feat(table): public write DataFile API (#776)
This exposes a `WriteRecords` function which enables users to write
parquet files outside of a append transaction, this can be useful in
conjunction with `ReplaceDataFilesWithDataFiles` or `AddDataFiles` for
something like compaction or other rewrite operations.
---
internal/utils.go | 8 ++
table/write_records.go | 129 ++++++++++++++++++++++++
table/write_records_test.go | 237 ++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 374 insertions(+)
diff --git a/internal/utils.go b/internal/utils.go
index 9ed69ea4..f7b06f7b 100644
--- a/internal/utils.go
+++ b/internal/utils.go
@@ -164,6 +164,14 @@ func RecoverError(err *error) {
}
}
+func SingleErrorIter[T any](err error) iter.Seq2[T, error] {
+ var z T
+
+ return func(yield func(T, error) bool) {
+ _ = yield(z, err)
+ }
+}
+
func Counter(start int) iter.Seq[int] {
var current atomic.Int64
current.Store(int64(start) - 1)
diff --git a/table/write_records.go b/table/write_records.go
new file mode 100644
index 00000000..f937e999
--- /dev/null
+++ b/table/write_records.go
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table
+
+import (
+ "context"
+ "fmt"
+ "iter"
+ "strconv"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/iceberg-go"
+ "github.com/apache/iceberg-go/internal"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/google/uuid"
+)
+
+// WriteRecordOption configures the behavior of WriteRecords.
+type WriteRecordOption func(*writeRecordConfig)
+
+type writeRecordConfig struct {
+ targetFileSize int64
+ writeUUID *uuid.UUID
+}
+
+// WithTargetFileSize overrides the table's default target file size.
+func WithTargetFileSize(size int64) WriteRecordOption {
+ return func(c *writeRecordConfig) {
+ c.targetFileSize = size
+ }
+}
+
+// WithWriteUUID sets a specific UUID for file naming.
+func WithWriteUUID(id uuid.UUID) WriteRecordOption {
+ return func(c *writeRecordConfig) {
+ c.writeUUID = &id
+ }
+}
+
+// WriteRecords writes Arrow record batches to Parquet data files for the given
+// table, returning an iterator of the resulting DataFile objects.
+//
+// The provided Arrow schema must be compatible with the table's current
Iceberg
+// schema: each field in the Arrow schema is matched to the table schema by
+// field ID (or by name via the table's name mapping if field IDs are absent).
+// The Arrow schema may be a subset of the table schema (projection), but every
+// field present must have a type that is promotable to the corresponding table
+// field type.
+//
+// WriteRecords releases each RecordBatch it consumes. If the caller needs a
+// batch to remain valid after it has been yielded, it must call Retain before
+// yielding and is then responsible for the corresponding Release.
+func WriteRecords(ctx context.Context, tbl *Table,
+ schema *arrow.Schema,
+ records iter.Seq2[arrow.RecordBatch, error],
+ opts ...WriteRecordOption,
+) iter.Seq2[iceberg.DataFile, error] {
+ if err := checkArrowSchemaCompat(tbl.Schema(), schema, false); err !=
nil {
+ return internal.SingleErrorIter[iceberg.DataFile](
+ fmt.Errorf("arrow schema is not compatible with the
table schema: %w", err))
+ }
+
+ cfg := writeRecordConfig{}
+ for _, opt := range opts {
+ opt(&cfg)
+ }
+
+ fs, err := tbl.fsF(ctx)
+ if err != nil {
+ return internal.SingleErrorIter[iceberg.DataFile](err)
+ }
+
+ writeFS, ok := fs.(iceio.WriteFileIO)
+ if !ok {
+ return
internal.SingleErrorIter[iceberg.DataFile](fmt.Errorf("%w: filesystem does not
support writing", iceberg.ErrNotImplemented))
+ }
+
+ meta, err := MetadataBuilderFromBase(tbl.metadata, tbl.metadataLocation)
+ if err != nil {
+ return
internal.SingleErrorIter[iceberg.DataFile](fmt.Errorf("failed to build
metadata: %w", err))
+ }
+
+ if cfg.targetFileSize > 0 {
+ if meta.props == nil {
+ meta.props = make(iceberg.Properties)
+ }
+ meta.props[WriteTargetFileSizeBytesKey] =
strconv.FormatInt(cfg.targetFileSize, 10)
+ }
+
+ releasing := func(yield func(arrow.RecordBatch, error) bool) {
+ for rec, err := range records {
+ if err != nil {
+ yield(nil, err)
+
+ return
+ }
+ if !yield(rec, nil) {
+ rec.Release()
+
+ return
+ }
+ rec.Release()
+ }
+ }
+
+ args := recordWritingArgs{
+ sc: schema,
+ itr: releasing,
+ fs: writeFS,
+ writeUUID: cfg.writeUUID,
+ }
+
+ return recordsToDataFiles(ctx, tbl.Location(), meta, args)
+}
diff --git a/table/write_records_test.go b/table/write_records_test.go
new file mode 100644
index 00000000..710c5768
--- /dev/null
+++ b/table/write_records_test.go
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package table_test
+
+import (
+ "context"
+ "errors"
+ "path/filepath"
+ "testing"
+
+ "github.com/apache/arrow-go/v18/arrow"
+ "github.com/apache/arrow-go/v18/arrow/array"
+ "github.com/apache/arrow-go/v18/arrow/compute"
+ "github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/iceberg-go"
+ iceio "github.com/apache/iceberg-go/io"
+ "github.com/apache/iceberg-go/table"
+ "github.com/google/uuid"
+ "github.com/stretchr/testify/suite"
+)
+
+type WriteRecordsTestSuite struct {
+ suite.Suite
+
+ mem *memory.CheckedAllocator
+ ctx context.Context
+}
+
+func TestWriteRecords(t *testing.T) {
+ suite.Run(t, new(WriteRecordsTestSuite))
+}
+
+func (s *WriteRecordsTestSuite) SetupTest() {
+ s.mem = memory.NewCheckedAllocator(memory.DefaultAllocator)
+ s.ctx = compute.WithAllocator(context.Background(), s.mem)
+}
+
+func (s *WriteRecordsTestSuite) TearDownTest() {
+ s.mem.AssertSize(s.T(), 0)
+}
+
+func (s *WriteRecordsTestSuite) newTable(loc string) *table.Table {
+ iceSch := iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: false},
+ iceberg.NestedField{ID: 2, Name: "name", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+ spec := iceberg.NewPartitionSpec()
+ meta, err := table.NewMetadata(iceSch, &spec, table.UnsortedSortOrder,
loc, iceberg.Properties{})
+ s.Require().NoError(err)
+
+ return table.New(
+ table.Identifier{"test", "write_records"},
+ meta,
+ filepath.Join(loc, "metadata", "v1.metadata.json"),
+ func(ctx context.Context) (iceio.IO, error) { return
iceio.LocalFS{}, nil },
+ nil,
+ )
+}
+
+func (s *WriteRecordsTestSuite) arrowSchema() *arrow.Schema {
+ return arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+ {Name: "name", Type: arrow.BinaryTypes.String, Nullable: true},
+ }, nil)
+}
+
+func (s *WriteRecordsTestSuite) buildRecords(schema *arrow.Schema, numRows
int) arrow.RecordBatch {
+ bldr := array.NewRecordBuilder(s.mem, schema)
+ defer bldr.Release()
+
+ for i := range numRows {
+ bldr.Field(0).(*array.Int32Builder).Append(int32(i))
+ bldr.Field(1).(*array.StringBuilder).Append("value")
+ }
+
+ return bldr.NewRecordBatch()
+}
+
+func (s *WriteRecordsTestSuite) TestBasicWrite() {
+ loc := filepath.ToSlash(s.T().TempDir())
+ tbl := s.newTable(loc)
+ schema := s.arrowSchema()
+
+ records := func(yield func(arrow.RecordBatch, error) bool) {
+ rec := s.buildRecords(schema, 10)
+ yield(rec, nil)
+ }
+
+ var dataFiles []iceberg.DataFile
+ for df, err := range table.WriteRecords(s.ctx, tbl, schema, records) {
+ s.Require().NoError(err)
+ dataFiles = append(dataFiles, df)
+ }
+
+ s.Require().Len(dataFiles, 1)
+ s.Equal(int64(10), dataFiles[0].Count())
+ s.Equal(iceberg.ParquetFile, dataFiles[0].FileFormat())
+ s.Greater(dataFiles[0].FileSizeBytes(), int64(0))
+ s.Contains(dataFiles[0].FilePath(), loc)
+}
+
+func (s *WriteRecordsTestSuite) TestSmallTargetFileSizeProducesMultipleFiles()
{
+ loc := filepath.ToSlash(s.T().TempDir())
+ tbl := s.newTable(loc)
+ schema := s.arrowSchema()
+
+ records := func(yield func(arrow.RecordBatch, error) bool) {
+ for range 10 {
+ rec := s.buildRecords(schema, 100)
+ if !yield(rec, nil) {
+ return
+ }
+ }
+ }
+
+ var totalRows int64
+ var dataFiles []iceberg.DataFile
+ for df, err := range table.WriteRecords(s.ctx, tbl, schema, records,
table.WithTargetFileSize(1)) {
+ s.Require().NoError(err)
+ dataFiles = append(dataFiles, df)
+ totalRows += df.Count()
+ }
+
+ s.Greater(len(dataFiles), 1)
+ s.Equal(int64(1000), totalRows)
+}
+
+func (s *WriteRecordsTestSuite) TestWithWriteUUID() {
+ loc := filepath.ToSlash(s.T().TempDir())
+ tbl := s.newTable(loc)
+ schema := s.arrowSchema()
+
+ writeID := uuid.New()
+ records := func(yield func(arrow.RecordBatch, error) bool) {
+ rec := s.buildRecords(schema, 5)
+ yield(rec, nil)
+ }
+
+ for df, err := range table.WriteRecords(s.ctx, tbl, schema, records,
table.WithWriteUUID(writeID)) {
+ s.Require().NoError(err)
+ s.Contains(df.FilePath(), writeID.String())
+ }
+}
+
+func (s *WriteRecordsTestSuite) TestFSNotWritable() {
+ iceSch := iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: false},
+ )
+ spec := iceberg.NewPartitionSpec()
+ meta, err := table.NewMetadata(iceSch, &spec, table.UnsortedSortOrder,
"/tmp/noop", iceberg.Properties{})
+ s.Require().NoError(err)
+
+ tbl := table.New(
+ table.Identifier{"test", "readonly"},
+ meta,
+ "/tmp/noop/metadata/v1.metadata.json",
+ func(ctx context.Context) (iceio.IO, error) { return
readOnlyFS{}, nil },
+ nil,
+ )
+
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+ }, nil)
+ records := func(yield func(arrow.RecordBatch, error) bool) {}
+
+ for _, err := range table.WriteRecords(s.ctx, tbl, schema, records) {
+ s.Require().ErrorIs(err, iceberg.ErrNotImplemented)
+ }
+}
+
+func (s *WriteRecordsTestSuite) TestFSError() {
+ iceSch := iceberg.NewSchema(1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: false},
+ )
+ spec := iceberg.NewPartitionSpec()
+ meta, err := table.NewMetadata(iceSch, &spec, table.UnsortedSortOrder,
"/tmp/noop", iceberg.Properties{})
+ s.Require().NoError(err)
+
+ fsErr := errors.New("connection refused")
+ tbl := table.New(
+ table.Identifier{"test", "fs_error"},
+ meta,
+ "/tmp/noop/metadata/v1.metadata.json",
+ func(ctx context.Context) (iceio.IO, error) { return nil, fsErr
},
+ nil,
+ )
+
+ schema := arrow.NewSchema([]arrow.Field{
+ {Name: "id", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
+ }, nil)
+ records := func(yield func(arrow.RecordBatch, error) bool) {}
+
+ for _, err := range table.WriteRecords(s.ctx, tbl, schema, records) {
+ s.Require().ErrorIs(err, fsErr)
+ }
+}
+
+func (s *WriteRecordsTestSuite) TestEmptyInput() {
+ loc := filepath.ToSlash(s.T().TempDir())
+ tbl := s.newTable(loc)
+ schema := s.arrowSchema()
+
+ records := func(yield func(arrow.RecordBatch, error) bool) {}
+
+ count := 0
+ for _, err := range table.WriteRecords(s.ctx, tbl, schema, records) {
+ s.Require().NoError(err)
+ count++
+ }
+
+ s.Equal(0, count)
+}
+
+type readOnlyFS struct{}
+
+func (readOnlyFS) Open(name string) (iceio.File, error) {
+ return nil, errors.New("not supported")
+}
+
+func (readOnlyFS) Remove(name string) error {
+ return errors.New("not supported")
+}