This is an automated email from the ASF dual-hosted git repository.
zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 676d9dcf feat: Implemented write-default v3 (#779)
676d9dcf is described below
commit 676d9dcfb1ff3fac866fb842fcb8c9537d5935f9
Author: PranjalChaitanya <[email protected]>
AuthorDate: Tue Mar 17 12:56:07 2026 -0400
feat: Implemented write-default v3 (#779)
Part of #589
This PR implements support for write-default values when projecting
Arrow batches in `ToRequestedSchema`. If a column is not provided but
there is a `WriteDefault` specified in the schema, it will create a
column populated with the default value.
To generate the column, this PR uses the method `MakeArrayFromScalar()`.
This is similar to how
[PyIceberg](https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L1998-L2009)
handles write-defaults.
One complication I ran into was converting Iceberg default values into
Arrow scalars. In the schema, WriteDefault is stored as any, which means
the concrete Go type information is lost at compile time. Iceberg
default values are often stored using named Go types such as Date, Time,
or Timestamp, which wrap primitive values like int32 or int64. Arrow’s
scalar helpers (`MakeScalar`, `MakeScalarParam`) infer the scalar type
from the Go value, and since these Iceberg types are not the same as
Arrow’s own types (such as `arrow.Date32`), Arrow may interpret them as
generic numeric scalars instead of their intended logical types.
For example, an iceberg.Date may be interpreted as a generic integer
rather than a date32 scalar. While the numeric value would still be
correct, the resulting Arrow array would have the wrong logical type.
Java and Python implementations don’t run into this issue in the same
way. From what I can tell, Java's core writers do not seem to be using
Arrow. In Python, pa.scalar(value, type=...) explicitly specifies the
Arrow type during scalar construction, so PyArrow does not need to infer
the type from the Python value.
Because Go stores the default as `any` in the schema, some runtime
dispatch is required to normalize the value before constructing the
Arrow scalar. The implementation in this PR handles those cases to
ensure the resulting Arrow array matches the schema’s logical type.
I added tests to test Write-Default across different iceberg types.
If there is a simpler or more idiomatic way to perform this conversion
within the Arrow or Iceberg-Go codebase, I would be very open to
changing the implementation.
---
table/arrow_scanner.go | 4 +-
table/arrow_utils.go | 157 ++++-
table/arrow_utils_test.go | 1003 +++++++++++++++++++++++++++++++-
table/metadata_schema_compatibility.go | 4 +-
table/rolling_data_writer.go | 2 +-
table/writer.go | 2 +-
6 files changed, 1159 insertions(+), 13 deletions(-)
diff --git a/table/arrow_scanner.go b/table/arrow_scanner.go
index 30d56477..42777adb 100644
--- a/table/arrow_scanner.go
+++ b/table/arrow_scanner.go
@@ -510,7 +510,7 @@ func (as *arrowScan) recordsFromTask(ctx context.Context,
task internal.Enumerat
pipeline = append(pipeline, func(r arrow.RecordBatch)
(arrow.RecordBatch, error) {
defer r.Release()
- return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r,
false, false, as.useLargeTypes)
+ return ToRequestedSchema(ctx, as.projectedSchema, iceSchema, r,
SchemaOptions{UseLargeTypes: as.useLargeTypes})
})
err = as.processRecords(ctx, task, iceSchema, rdr, colIndices,
pipeline, out)
@@ -582,7 +582,7 @@ func (as *arrowScan) producePosDeletesFromTask(ctx
context.Context, task interna
pipeline = append(pipeline, func(r arrow.RecordBatch)
(arrow.RecordBatch, error) {
defer r.Release()
- return ToRequestedSchema(ctx, iceberg.PositionalDeleteSchema,
enrichedIcebergSchema, r, false, true, as.useLargeTypes)
+ return ToRequestedSchema(ctx, iceberg.PositionalDeleteSchema,
enrichedIcebergSchema, r, SchemaOptions{IncludeFieldIDs: true, UseLargeTypes:
as.useLargeTypes})
})
err = as.processRecords(ctx, task, iceSchema, rdr, colIndices,
pipeline, out)
diff --git a/table/arrow_utils.go b/table/arrow_utils.go
index dc548307..d7f0ee89 100644
--- a/table/arrow_utils.go
+++ b/table/arrow_utils.go
@@ -19,6 +19,8 @@ package table
import (
"context"
+ "encoding/base64"
+ "encoding/json"
"fmt"
"iter"
"slices"
@@ -29,8 +31,10 @@ import (
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/compute"
+ "github.com/apache/arrow-go/v18/arrow/decimal128"
"github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/arrow-go/v18/arrow/memory"
+ "github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/config"
"github.com/apache/iceberg-go/internal"
@@ -715,12 +719,139 @@ func retOrPanic[T any](v T, err error) T {
return v
}
+// numericDefault converts v to T, accepting the typed iceberg form, the
+// float64 that encoding/json produces when deserializing into any, or the
+// json.Number that a decoder configured with UseNumber() produces.
+func numericDefault[T ~int32 | ~int64 | ~float32 | ~float64](v any) T {
+ switch val := v.(type) {
+ case T:
+ return val
+ case float64:
+ return T(val)
+ case json.Number:
+ f, err := val.Float64()
+ if err != nil {
+ panic(fmt.Errorf("unsupported json.Number %q for
numeric iceberg type: %w", val, err))
+ }
+
+ return T(f)
+ }
+ panic(fmt.Errorf("unsupported write-default value type %T for numeric
iceberg type", v))
+}
+
+// defaultToScalar converts an Iceberg default value to an Arrow scalar.
+func defaultToScalar(v any, t iceberg.Type, dt arrow.DataType) scalar.Scalar {
+ switch typ := t.(type) {
+ case iceberg.Float32Type:
+ s, err := scalar.MakeScalarParam(numericDefault[float32](v), dt)
+ if err != nil {
+ panic(fmt.Errorf("write-default float32 (iceberg type
%s, value %v %T): %w", t, v, v, err))
+ }
+
+ return s
+ case iceberg.DateType:
+ return
scalar.NewDate32Scalar(arrow.Date32(numericDefault[iceberg.Date](v)))
+ case iceberg.TimeType:
+ return
scalar.NewTime64Scalar(arrow.Time64(numericDefault[iceberg.Time](v)), dt)
+ case iceberg.TimestampType, iceberg.TimestampTzType:
+ return
scalar.NewTimestampScalar(arrow.Timestamp(numericDefault[iceberg.Timestamp](v)),
dt)
+ case iceberg.TimestampNsType, iceberg.TimestampTzNsType:
+ return
scalar.NewTimestampScalar(arrow.Timestamp(numericDefault[iceberg.TimestampNano](v)),
dt)
+ case iceberg.UUIDType:
+ switch val := v.(type) {
+ case uuid.UUID:
+ s, err := scalar.MakeScalarParam(val[:],
&arrow.FixedSizeBinaryType{ByteWidth: 16})
+ if err != nil {
+ panic(fmt.Errorf("write-default uuid (value
%v): %w", val, err))
+ }
+
+ return s
+ case string:
+ u, err := uuid.Parse(val)
+ if err != nil {
+ panic(fmt.Errorf("write-default uuid: cannot
parse string %q: %w", val, err))
+ }
+ s, err := scalar.MakeScalarParam(u[:],
&arrow.FixedSizeBinaryType{ByteWidth: 16})
+ if err != nil {
+ panic(fmt.Errorf("write-default uuid (value
%v): %w", val, err))
+ }
+
+ return s
+ }
+ panic(fmt.Errorf("write-default uuid: unsupported value type %T
(%v)", v, v))
+ case iceberg.DecimalType:
+ switch val := v.(type) {
+ case iceberg.Decimal:
+ return scalar.NewDecimal128Scalar(val.Val, dt)
+ case string:
+ n, err := decimal128.FromString(val,
int32(typ.Precision()), int32(typ.Scale()))
+ if err != nil {
+ panic(fmt.Errorf("write-default decimal(p=%d,
s=%d): cannot parse string %q: %w", typ.Precision(), typ.Scale(), val, err))
+ }
+
+ return scalar.NewDecimal128Scalar(n, dt)
+ }
+ panic(fmt.Errorf("write-default decimal: unsupported value type
%T (%v)", v, v))
+ case iceberg.BinaryType, iceberg.FixedType:
+ switch val := v.(type) {
+ case []byte:
+ s, err := scalar.MakeScalarParam(val, dt)
+ if err != nil {
+ panic(fmt.Errorf("write-default binary/fixed
(iceberg type %s, value %v): %w", t, val, err))
+ }
+
+ return s
+ case string:
+ b, err := base64.StdEncoding.DecodeString(val)
+ if err != nil {
+ panic(fmt.Errorf("write-default binary/fixed
(iceberg type %s): cannot base64-decode string %q: %w", t, val, err))
+ }
+ s, err := scalar.MakeScalarParam(b, dt)
+ if err != nil {
+ panic(fmt.Errorf("write-default binary/fixed
(iceberg type %s, value %v): %w", t, b, err))
+ }
+
+ return s
+ }
+ panic(fmt.Errorf("write-default binary/fixed: unsupported value
type %T (%v)", v, v))
+ // Float64, Bool, and String cast normally.
+ // Int32 and Int64 arrive as float64 from JSON and are handled by
MakeScalarParam.
+ default:
+ s, err := scalar.MakeScalarParam(v, dt)
+ if err != nil {
+ panic(fmt.Errorf("write-default (iceberg type %s, value
%v %T): %w", t, v, v, err))
+ }
+
+ return s
+ }
+}
+
+// defaultToArray creates an Arrow array of length n filled with the given
default value v.
+func defaultToArray(v any, t iceberg.Type, dt arrow.DataType, n int, alloc
memory.Allocator) arrow.Array {
+ sc := defaultToScalar(v, t, dt)
+ out, err := scalar.MakeArrayFromScalar(sc, n, alloc)
+ if err != nil {
+ panic(fmt.Errorf("write-default (iceberg type %s, value %v %T):
failed to create array: %w", t, v, v, err))
+ }
+ if _, ok := dt.(*extensions.UUIDType); ok {
+ defer out.Release()
+
+ data := array.NewData(dt, out.Len(), out.Data().Buffers(), nil,
out.NullN(), 0)
+ defer data.Release()
+
+ return array.MakeFromData(data)
+ }
+
+ return out
+}
+
type arrowProjectionVisitor struct {
ctx context.Context
fileSchema *iceberg.Schema
includeFieldIDs bool
downcastNsTimestamp bool
useLargeTypes bool
+ useWriteDefault bool
}
func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals
arrow.Array) arrow.Array {
@@ -832,7 +963,14 @@ func (a *arrowProjectionVisitor) Struct(st
iceberg.StructType, structArr arrow.A
} else if !field.Required {
dt := retOrPanic(TypeToArrowType(field.Type, false,
a.useLargeTypes))
- arr =
array.MakeArrayOfNull(compute.GetAllocator(a.ctx), dt, structArr.Len())
+ if field.WriteDefault != nil && a.useWriteDefault {
+ arr = defaultToArray(field.WriteDefault,
field.Type, dt, structArr.Len(), compute.GetAllocator(a.ctx))
+ } else if field.InitialDefault != nil &&
!a.useWriteDefault {
+ arr = defaultToArray(field.InitialDefault,
field.Type, dt, structArr.Len(), compute.GetAllocator(a.ctx))
+ } else {
+ arr =
array.MakeArrayOfNull(compute.GetAllocator(a.ctx), dt, structArr.Len())
+ }
+
defer arr.Release()
fieldArrs[i] = arr
fields[i] = a.constructField(field, arr.DataType())
@@ -928,9 +1066,17 @@ func (a *arrowProjectionVisitor) Primitive(_
iceberg.PrimitiveType, arr arrow.Ar
return arr
}
+// SchemaOptions controls the behaviour of ToRequestedSchema.
+type SchemaOptions struct {
+ DowncastTimestamp bool
+ IncludeFieldIDs bool
+ UseLargeTypes bool
+ UseWriteDefault bool
+}
+
// ToRequestedSchema will construct a new record batch matching the requested
iceberg schema
// casting columns if necessary as appropriate.
-func ToRequestedSchema(ctx context.Context, requested, fileSchema
*iceberg.Schema, batch arrow.RecordBatch, downcastTimestamp, includeFieldIDs,
useLargeTypes bool) (arrow.RecordBatch, error) {
+func ToRequestedSchema(ctx context.Context, requested, fileSchema
*iceberg.Schema, batch arrow.RecordBatch, opts SchemaOptions)
(arrow.RecordBatch, error) {
st := array.RecordToStructArray(batch)
defer st.Release()
@@ -938,9 +1084,10 @@ func ToRequestedSchema(ctx context.Context, requested,
fileSchema *iceberg.Schem
&arrowProjectionVisitor{
ctx: ctx,
fileSchema: fileSchema,
- includeFieldIDs: includeFieldIDs,
- downcastNsTimestamp: downcastTimestamp,
- useLargeTypes: useLargeTypes,
+ includeFieldIDs: opts.IncludeFieldIDs,
+ downcastNsTimestamp: opts.DowncastTimestamp,
+ useLargeTypes: opts.UseLargeTypes,
+ useWriteDefault: opts.UseWriteDefault,
}, arrowAccessor{fileSchema: fileSchema})
if err != nil {
return nil, err
diff --git a/table/arrow_utils_test.go b/table/arrow_utils_test.go
index 0f41b22c..833f6ee7 100644
--- a/table/arrow_utils_test.go
+++ b/table/arrow_utils_test.go
@@ -20,16 +20,20 @@ package table_test
import (
"bufio"
"context"
+ "encoding/base64"
+ "encoding/json"
"strings"
"testing"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute"
+ "github.com/apache/arrow-go/v18/arrow/decimal128"
"github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/table"
+ "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -529,7 +533,7 @@ func TestToRequestedSchemaTimestamps(t *testing.T) {
requestedSchema := TableSchemaWithAllMicrosecondsTimestampPrec
fileSchema := requestedSchema
- converted, err := table.ToRequestedSchema(ctx, requestedSchema,
fileSchema, batch, true, false, false)
+ converted, err := table.ToRequestedSchema(ctx, requestedSchema,
fileSchema, batch, table.SchemaOptions{DowncastTimestamp: true})
require.NoError(t, err)
defer converted.Release()
@@ -579,9 +583,1004 @@ func TestToRequestedSchema(t *testing.T) {
icesc, err := table.ArrowSchemaToIceberg(schema, false, nil)
require.NoError(t, err)
- rec2, err := table.ToRequestedSchema(context.Background(), icesc,
icesc, rec, true, true, false)
+ rec2, err := table.ToRequestedSchema(context.Background(), icesc,
icesc, rec, table.SchemaOptions{DowncastTimestamp: true, IncludeFieldIDs: true})
require.NoError(t, err)
defer rec2.Release()
assert.True(t, array.RecordEqual(rec, rec2))
}
+
+func TestToRequestedSchemaWriteDefaults(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ fileIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+
+ requestedIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 2, Name: "event_date", Type:
iceberg.PrimitiveTypes.Date, Required: false, WriteDefault: iceberg.Date(1234)},
+ )
+
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {
+ Name: "id",
+ Type: arrow.PrimitiveTypes.Int32,
+ Nullable: false,
+ Metadata:
arrow.MetadataFrom(map[string]string{table.ArrowParquetFieldIDKey: "1"}),
+ },
+ }, nil)
+ bldr := array.NewRecordBuilder(mem, arrowSchema)
+ defer bldr.Release()
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3}, nil)
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ result, err := table.ToRequestedSchema(ctx, requestedIceSchema,
fileIceSchema, rec, table.SchemaOptions{UseWriteDefault: true})
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.EqualValues(t, 2, result.NumCols())
+ dateCol := result.Column(1)
+ require.Equal(t, arrow.DATE32, dateCol.DataType().ID(), "expected
date32 column, got %s", dateCol.DataType())
+ require.Equal(t, 3, dateCol.Len())
+ dateArr := dateCol.(*array.Date32)
+ for i := 0; i < dateArr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(1234), dateArr.Value(i), "row %d
should have write-default value", i)
+ }
+}
+
+func TestToRequestedSchemaWriteDefaultsTypes(t *testing.T) {
+ ctx := context.Background()
+
+ buildBaseRecord := func(mem memory.Allocator) arrow.RecordBatch {
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {
+ Name: "id",
+ Type: arrow.PrimitiveTypes.Int32,
+ Nullable: false,
+ Metadata:
arrow.MetadataFrom(map[string]string{table.ArrowParquetFieldIDKey: "1"}),
+ },
+ }, nil)
+ bldr := array.NewRecordBuilder(mem, arrowSchema)
+ defer bldr.Release()
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2,
3}, nil)
+
+ return bldr.NewRecordBatch()
+ }
+
+ fileIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+
+ tests := []struct {
+ name string
+ field iceberg.NestedField
+ check func(t *testing.T, col arrow.Array)
+ }{
+ {
+ name: "time write-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.Time,
+ Required: false, WriteDefault:
iceberg.Time(5000000),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIME64,
col.DataType().ID())
+ timeArr := col.(*array.Time64)
+ for i := 0; i < timeArr.Len(); i++ {
+ assert.Equal(t, arrow.Time64(5000000),
timeArr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamp write-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.Timestamp,
+ Required: false, WriteDefault:
iceberg.Timestamp(1700000000000000),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ tsArr := col.(*array.Timestamp)
+ for i := 0; i < tsArr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000), tsArr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "uuid write-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "uid", Type: iceberg.UUIDType{},
+ Required: false, WriteDefault:
uuid.MustParse("f79c3e09-677c-4bbd-a479-512f87f77acf"),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.EXTENSION,
col.DataType().ID())
+ uuidArr := col.(*extensions.UUIDArray)
+ expected :=
uuid.MustParse("f79c3e09-677c-4bbd-a479-512f87f77acf")
+ for i := 0; i < uuidArr.Len(); i++ {
+ assert.Equal(t, expected,
uuidArr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "decimal write-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "price", Type:
iceberg.DecimalTypeOf(10, 2),
+ Required: false, WriteDefault:
iceberg.Decimal{Val: decimal128.New(0, 12345), Scale: 2},
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DECIMAL128,
col.DataType().ID())
+ decArr := col.(*array.Decimal128)
+ for i := 0; i < decArr.Len(); i++ {
+ assert.Equal(t, decimal128.New(0,
12345), decArr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "bool write-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "flag", Type:
iceberg.PrimitiveTypes.Bool,
+ Required: false, WriteDefault: true,
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.BOOL,
col.DataType().ID())
+ boolArr := col.(*array.Boolean)
+ for i := 0; i < boolArr.Len(); i++ {
+ assert.True(t, boolArr.Value(i), "row
%d", i)
+ }
+ },
+ },
+ {
+ name: "string write-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "label", Type:
iceberg.PrimitiveTypes.String,
+ Required: false, WriteDefault: "hello",
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ strArr := col.(*array.String)
+ for i := 0; i < strArr.Len(); i++ {
+ assert.Equal(t, "hello",
strArr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ // initial-default is set but must be ignored on the
write path
+ name: "ignores initial-default when write-default is
set",
+ field: iceberg.NestedField{
+ ID: 2, Name: "dt", Type:
iceberg.PrimitiveTypes.Date,
+ Required: false, InitialDefault:
iceberg.Date(100), WriteDefault: iceberg.Date(999),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ arr := col.(*array.Date32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(999),
arr.Value(i), "row %d: should use write-default (999), not initial-default
(100)", i)
+ }
+ },
+ },
+ {
+ // json.Number is produced when metadata is decoded via
json.Decoder.UseNumber()
+ name: "date write-default as json.Number",
+ field: iceberg.NestedField{
+ ID: 2, Name: "dt", Type:
iceberg.PrimitiveTypes.Date,
+ Required: false, WriteDefault:
json.Number("1234"),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ arr := col.(*array.Date32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(1234),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ mem :=
memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ requestedIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ tt.field,
+ )
+
+ rec := buildBaseRecord(mem)
+ defer rec.Release()
+
+ result, err := table.ToRequestedSchema(ctx,
requestedIceSchema, fileIceSchema, rec, table.SchemaOptions{UseWriteDefault:
true})
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.EqualValues(t, 2, result.NumCols())
+ tt.check(t, result.Column(1))
+ })
+ }
+}
+
+// TestToRequestedSchemaInitialDefaults is the read-path equivalent of
+// TestToRequestedSchemaWriteDefaults: a missing column is filled using
+// InitialDefault (useWriteDefault=false).
+func TestToRequestedSchemaInitialDefaults(t *testing.T) {
+ ctx := context.Background()
+ mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ fileIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+
+ requestedIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ iceberg.NestedField{ID: 2, Name: "event_date", Type:
iceberg.PrimitiveTypes.Date, Required: false, InitialDefault:
iceberg.Date(1234)},
+ )
+
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {
+ Name: "id",
+ Type: arrow.PrimitiveTypes.Int32,
+ Nullable: false,
+ Metadata:
arrow.MetadataFrom(map[string]string{table.ArrowParquetFieldIDKey: "1"}),
+ },
+ }, nil)
+ bldr := array.NewRecordBuilder(mem, arrowSchema)
+ defer bldr.Release()
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3}, nil)
+ rec := bldr.NewRecordBatch()
+ defer rec.Release()
+
+ result, err := table.ToRequestedSchema(ctx, requestedIceSchema,
fileIceSchema, rec, table.SchemaOptions{})
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.EqualValues(t, 2, result.NumCols())
+ dateCol := result.Column(1)
+ require.Equal(t, arrow.DATE32, dateCol.DataType().ID(), "expected
date32 column, got %s", dateCol.DataType())
+ require.Equal(t, 3, dateCol.Len())
+ dateArr := dateCol.(*array.Date32)
+ for i := 0; i < dateArr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(1234), dateArr.Value(i), "row %d
should have initial-default value", i)
+ }
+}
+
+// TestToRequestedSchemaInitialDefaultTypes is the read-path equivalent of
+// TestToRequestedSchemaWriteDefaultsTypes: covers all Iceberg types using
+// programmatically constructed NestedFields with InitialDefault set.
+func TestToRequestedSchemaInitialDefaultTypes(t *testing.T) {
+ ctx := context.Background()
+
+ buildBaseRecord := func(mem memory.Allocator) arrow.RecordBatch {
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {
+ Name: "id",
+ Type: arrow.PrimitiveTypes.Int32,
+ Nullable: false,
+ Metadata:
arrow.MetadataFrom(map[string]string{table.ArrowParquetFieldIDKey: "1"}),
+ },
+ }, nil)
+ bldr := array.NewRecordBuilder(mem, arrowSchema)
+ defer bldr.Release()
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2,
3}, nil)
+
+ return bldr.NewRecordBatch()
+ }
+
+ fileIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+
+ tests := []struct {
+ name string
+ field iceberg.NestedField
+ check func(t *testing.T, col arrow.Array)
+ }{
+ {
+ name: "date initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "dt", Type:
iceberg.PrimitiveTypes.Date,
+ Required: false, InitialDefault:
iceberg.Date(1234),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ arr := col.(*array.Date32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(1234),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "time initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.Time,
+ Required: false, InitialDefault:
iceberg.Time(5000000),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIME64,
col.DataType().ID())
+ arr := col.(*array.Time64)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Time64(5000000),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamp initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "ts", Type:
iceberg.PrimitiveTypes.Timestamp,
+ Required: false, InitialDefault:
iceberg.Timestamp(1700000000000000),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "uuid initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "uid", Type: iceberg.UUIDType{},
+ Required: false, InitialDefault:
uuid.MustParse("f79c3e09-677c-4bbd-a479-512f87f77acf"),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.EXTENSION,
col.DataType().ID())
+ arr := col.(*extensions.UUIDArray)
+ expected :=
uuid.MustParse("f79c3e09-677c-4bbd-a479-512f87f77acf")
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, expected, arr.Value(i),
"row %d", i)
+ }
+ },
+ },
+ {
+ name: "decimal initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "price", Type:
iceberg.DecimalTypeOf(10, 2),
+ Required: false, InitialDefault:
iceberg.Decimal{Val: decimal128.New(0, 12345), Scale: 2},
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DECIMAL128,
col.DataType().ID())
+ arr := col.(*array.Decimal128)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, decimal128.New(0,
12345), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "bool initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "flag", Type:
iceberg.PrimitiveTypes.Bool,
+ Required: false, InitialDefault: true,
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.BOOL,
col.DataType().ID())
+ arr := col.(*array.Boolean)
+ for i := 0; i < arr.Len(); i++ {
+ assert.True(t, arr.Value(i), "row %d",
i)
+ }
+ },
+ },
+ {
+ name: "string initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "label", Type:
iceberg.PrimitiveTypes.String,
+ Required: false, InitialDefault: "hello",
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ arr := col.(*array.String)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, "hello", arr.Value(i),
"row %d", i)
+ }
+ },
+ },
+ {
+ name: "binary initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "data", Type:
iceberg.PrimitiveTypes.Binary,
+ Required: false, InitialDefault:
[]byte("hello"),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.BINARY,
col.DataType().ID())
+ arr := col.(*array.Binary)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, []byte("hello"),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "int32 initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "n", Type:
iceberg.PrimitiveTypes.Int32,
+ Required: false, InitialDefault: int32(42),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.INT32,
col.DataType().ID())
+ arr := col.(*array.Int32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, int32(42),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "int64 initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "n", Type:
iceberg.PrimitiveTypes.Int64,
+ Required: false, InitialDefault: int64(9999),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.INT64,
col.DataType().ID())
+ arr := col.(*array.Int64)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, int64(9999),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ // write-default is set but must be ignored on the read
path
+ name: "ignores write-default when initial-default is
set",
+ field: iceberg.NestedField{
+ ID: 2, Name: "dt", Type:
iceberg.PrimitiveTypes.Date,
+ Required: false, InitialDefault:
iceberg.Date(100), WriteDefault: iceberg.Date(999),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ arr := col.(*array.Date32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(100),
arr.Value(i), "row %d: should use initial-default (100), not write-default
(999)", i)
+ }
+ },
+ },
+ {
+ // json.Number is produced when metadata is decoded via
json.Decoder.UseNumber()
+ name: "date initial-default as json.Number",
+ field: iceberg.NestedField{
+ ID: 2, Name: "dt", Type:
iceberg.PrimitiveTypes.Date,
+ Required: false, InitialDefault:
json.Number("1234"),
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ arr := col.(*array.Date32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(1234),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ // no default set at all — column must be null
+ name: "falls back to null when no initial-default",
+ field: iceberg.NestedField{
+ ID: 2, Name: "dt", Type:
iceberg.PrimitiveTypes.Date,
+ Required: false,
+ },
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ for i := 0; i < col.Len(); i++ {
+ assert.True(t, col.IsNull(i), "row %d
should be null", i)
+ }
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ mem :=
memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ requestedIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ tt.field,
+ )
+
+ rec := buildBaseRecord(mem)
+ defer rec.Release()
+
+ result, err := table.ToRequestedSchema(ctx,
requestedIceSchema, fileIceSchema, rec, table.SchemaOptions{})
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.EqualValues(t, 2, result.NumCols())
+ tt.check(t, result.Column(1))
+ })
+ }
+}
+
+// TestToRequestedSchemaInitialDefaultJSONRoundTrip is the read-path equivalent
+// of TestToRequestedSchemaWriteDefaultJSONRoundTrip. It unmarshals
NestedFields
+// from JSON (as they arrive from a REST catalog or metadata file) and verifies
+// that initial-default values are projected correctly with
useWriteDefault=false.
+func TestToRequestedSchemaInitialDefaultJSONRoundTrip(t *testing.T) {
+ ctx := context.Background()
+
+ fileIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+
+ buildBaseRecord := func(mem memory.Allocator) arrow.RecordBatch {
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {
+ Name: "id",
+ Type: arrow.PrimitiveTypes.Int32,
+ Nullable: false,
+ Metadata:
arrow.MetadataFrom(map[string]string{table.ArrowParquetFieldIDKey: "1"}),
+ },
+ }, nil)
+ bldr := array.NewRecordBuilder(mem, arrowSchema)
+ defer bldr.Release()
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2,
3}, nil)
+
+ return bldr.NewRecordBatch()
+ }
+
+ unmarshalField := func(t *testing.T, fieldJSON string)
iceberg.NestedField {
+ t.Helper()
+ var field iceberg.NestedField
+ require.NoError(t, json.Unmarshal([]byte(fieldJSON), &field))
+
+ return field
+ }
+
+ tests := []struct {
+ name string
+ fieldJSON string
+ check func(t *testing.T, col arrow.Array)
+ }{
+ {
+ name: "date as float64",
+ fieldJSON:
`{"id":2,"name":"dt","type":"date","required":false,"initial-default":1234}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ arr := col.(*array.Date32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(1234),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "time as float64",
+ fieldJSON:
`{"id":2,"name":"t","type":"time","required":false,"initial-default":5000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIME64,
col.DataType().ID())
+ arr := col.(*array.Time64)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Time64(5000000),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamp as float64",
+ fieldJSON:
`{"id":2,"name":"ts","type":"timestamp","required":false,"initial-default":1700000000000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamptz as float64",
+ fieldJSON:
`{"id":2,"name":"ts","type":"timestamptz","required":false,"initial-default":1700000000000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamp_ns as float64",
+ fieldJSON:
`{"id":2,"name":"ts","type":"timestamp_ns","required":false,"initial-default":1700000000000000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamptz_ns as float64",
+ fieldJSON:
`{"id":2,"name":"ts","type":"timestamptz_ns","required":false,"initial-default":1700000000000000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "uuid as string",
+ fieldJSON:
`{"id":2,"name":"uid","type":"uuid","required":false,"initial-default":"f79c3e09-677c-4bbd-a479-512f87f77acf"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.EXTENSION,
col.DataType().ID())
+ arr := col.(*extensions.UUIDArray)
+ expected :=
uuid.MustParse("f79c3e09-677c-4bbd-a479-512f87f77acf")
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, expected, arr.Value(i),
"row %d", i)
+ }
+ },
+ },
+ {
+ name: "decimal as string",
+ fieldJSON: `{"id":2,"name":"price","type":"decimal(10,
2)","required":false,"initial-default":"123.45"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DECIMAL128,
col.DataType().ID())
+ arr := col.(*array.Decimal128)
+ expected := decimal128.FromI64(12345)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, expected, arr.Value(i),
"row %d", i)
+ }
+ },
+ },
+ {
+ name: "binary as base64 string",
+ fieldJSON:
`{"id":2,"name":"data","type":"binary","required":false,"initial-default":"` +
base64.StdEncoding.EncodeToString([]byte("hello")) + `"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.BINARY,
col.DataType().ID())
+ arr := col.(*array.Binary)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, []byte("hello"),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "fixed as base64 string",
+ fieldJSON:
`{"id":2,"name":"data","type":"fixed[5]","required":false,"initial-default":"`
+ base64.StdEncoding.EncodeToString([]byte("hello")) + `"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.FIXED_SIZE_BINARY,
col.DataType().ID())
+ arr := col.(*array.FixedSizeBinary)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, []byte("hello"),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "bool",
+ fieldJSON:
`{"id":2,"name":"flag","type":"boolean","required":false,"initial-default":true}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.BOOL,
col.DataType().ID())
+ arr := col.(*array.Boolean)
+ for i := 0; i < arr.Len(); i++ {
+ assert.True(t, arr.Value(i), "row %d",
i)
+ }
+ },
+ },
+ {
+ name: "int as float64",
+ fieldJSON:
`{"id":2,"name":"n","type":"int","required":false,"initial-default":42}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.INT32,
col.DataType().ID())
+ arr := col.(*array.Int32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, int32(42),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "long as float64",
+ fieldJSON:
`{"id":2,"name":"n","type":"long","required":false,"initial-default":42}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.INT64,
col.DataType().ID())
+ arr := col.(*array.Int64)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, int64(42),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "float as float64",
+ fieldJSON:
`{"id":2,"name":"n","type":"float","required":false,"initial-default":3.14}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.FLOAT32,
col.DataType().ID())
+ arr := col.(*array.Float32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.InDelta(t, float32(3.14),
arr.Value(i), 0.001, "row %d", i)
+ }
+ },
+ },
+ {
+ name: "double as float64",
+ fieldJSON:
`{"id":2,"name":"n","type":"double","required":false,"initial-default":3.14}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.FLOAT64,
col.DataType().ID())
+ arr := col.(*array.Float64)
+ for i := 0; i < arr.Len(); i++ {
+ assert.InDelta(t, float64(3.14),
arr.Value(i), 0.0001, "row %d", i)
+ }
+ },
+ },
+ {
+ name: "string",
+ fieldJSON:
`{"id":2,"name":"s","type":"string","required":false,"initial-default":"hello"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ arr := col.(*array.String)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, "hello", arr.Value(i),
"row %d", i)
+ }
+ },
+ },
+ {
+ // both defaults present — read path must use
initial-default
+ name: "prefers initial-default over write-default
on read path",
+ fieldJSON:
`{"id":2,"name":"dt","type":"date","required":false,"initial-default":100,"write-default":999}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ arr := col.(*array.Date32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(100),
arr.Value(i), "row %d: should use initial-default (100), not write-default
(999)", i)
+ }
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ mem :=
memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ field := unmarshalField(t, tt.fieldJSON)
+
+ requestedIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ field,
+ )
+
+ rec := buildBaseRecord(mem)
+ defer rec.Release()
+
+ result, err := table.ToRequestedSchema(ctx,
requestedIceSchema, fileIceSchema, rec, table.SchemaOptions{})
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.EqualValues(t, 2, result.NumCols())
+ tt.check(t, result.Column(1))
+ })
+ }
+}
+
+// TestToRequestedSchemaWriteDefaultJSONRoundTrip verifies that write-default
+// values arriving from a REST catalog or metadata file (where encoding/json
+// decodes numbers as float64 and strings as string) are handled correctly
+// without panicking. Each sub-test unmarshals a NestedField from a raw JSON
+// snippet and projects it through ToRequestedSchema.
+func TestToRequestedSchemaWriteDefaultJSONRoundTrip(t *testing.T) {
+ ctx := context.Background()
+
+ fileIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+
+ buildBaseRecord := func(mem memory.Allocator) arrow.RecordBatch {
+ arrowSchema := arrow.NewSchema([]arrow.Field{
+ {
+ Name: "id",
+ Type: arrow.PrimitiveTypes.Int32,
+ Nullable: false,
+ Metadata:
arrow.MetadataFrom(map[string]string{table.ArrowParquetFieldIDKey: "1"}),
+ },
+ }, nil)
+ bldr := array.NewRecordBuilder(mem, arrowSchema)
+ defer bldr.Release()
+ bldr.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2,
3}, nil)
+
+ return bldr.NewRecordBatch()
+ }
+
+ unmarshalField := func(t *testing.T, fieldJSON string)
iceberg.NestedField {
+ t.Helper()
+ var field iceberg.NestedField
+ require.NoError(t, json.Unmarshal([]byte(fieldJSON), &field))
+
+ return field
+ }
+
+ tests := []struct {
+ name string
+ fieldJSON string
+ check func(t *testing.T, col arrow.Array)
+ }{
+ {
+ name: "date as float64",
+ fieldJSON:
`{"id":2,"name":"dt","type":"date","required":false,"write-default":1234}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DATE32,
col.DataType().ID())
+ arr := col.(*array.Date32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Date32(1234),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "time as float64",
+ fieldJSON:
`{"id":2,"name":"t","type":"time","required":false,"write-default":5000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIME64,
col.DataType().ID())
+ arr := col.(*array.Time64)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, arrow.Time64(5000000),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamp as float64",
+ fieldJSON:
`{"id":2,"name":"ts","type":"timestamp","required":false,"write-default":1700000000000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamptz as float64",
+ fieldJSON:
`{"id":2,"name":"ts","type":"timestamptz","required":false,"write-default":1700000000000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamp_ns as float64",
+ fieldJSON:
`{"id":2,"name":"ts","type":"timestamp_ns","required":false,"write-default":1700000000000000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "uuid as string",
+ fieldJSON:
`{"id":2,"name":"uid","type":"uuid","required":false,"write-default":"f79c3e09-677c-4bbd-a479-512f87f77acf"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.EXTENSION,
col.DataType().ID())
+ arr := col.(*extensions.UUIDArray)
+ expected :=
uuid.MustParse("f79c3e09-677c-4bbd-a479-512f87f77acf")
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, expected, arr.Value(i),
"row %d", i)
+ }
+ },
+ },
+ {
+ name: "decimal as string",
+ fieldJSON: `{"id":2,"name":"price","type":"decimal(10,
2)","required":false,"write-default":"123.45"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.DECIMAL128,
col.DataType().ID())
+ arr := col.(*array.Decimal128)
+ expected := decimal128.FromI64(12345)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, expected, arr.Value(i),
"row %d", i)
+ }
+ },
+ },
+ {
+ name: "binary as base64 string",
+ fieldJSON:
`{"id":2,"name":"data","type":"binary","required":false,"write-default":"` +
base64.StdEncoding.EncodeToString([]byte("hello")) + `"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.BINARY,
col.DataType().ID())
+ arr := col.(*array.Binary)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, []byte("hello"),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "fixed as base64 string",
+ fieldJSON:
`{"id":2,"name":"data","type":"fixed[5]","required":false,"write-default":"` +
base64.StdEncoding.EncodeToString([]byte("hello")) + `"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.FIXED_SIZE_BINARY,
col.DataType().ID())
+ arr := col.(*array.FixedSizeBinary)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, []byte("hello"),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "timestamptz_ns as float64",
+ fieldJSON:
`{"id":2,"name":"ts","type":"timestamptz_ns","required":false,"write-default":1700000000000000000}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.TIMESTAMP,
col.DataType().ID())
+ arr := col.(*array.Timestamp)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t,
arrow.Timestamp(1700000000000000000), arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "bool",
+ fieldJSON:
`{"id":2,"name":"flag","type":"boolean","required":false,"write-default":true}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.BOOL,
col.DataType().ID())
+ arr := col.(*array.Boolean)
+ for i := 0; i < arr.Len(); i++ {
+ assert.True(t, arr.Value(i), "row %d",
i)
+ }
+ },
+ },
+ {
+ name: "int as float64",
+ fieldJSON:
`{"id":2,"name":"n","type":"int","required":false,"write-default":42}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.INT32,
col.DataType().ID())
+ arr := col.(*array.Int32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, int32(42),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "long as float64",
+ fieldJSON:
`{"id":2,"name":"n","type":"long","required":false,"write-default":42}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.INT64,
col.DataType().ID())
+ arr := col.(*array.Int64)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, int64(42),
arr.Value(i), "row %d", i)
+ }
+ },
+ },
+ {
+ name: "float as float64",
+ fieldJSON:
`{"id":2,"name":"n","type":"float","required":false,"write-default":3.14}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.FLOAT32,
col.DataType().ID())
+ arr := col.(*array.Float32)
+ for i := 0; i < arr.Len(); i++ {
+ assert.InDelta(t, float32(3.14),
arr.Value(i), 0.001, "row %d", i)
+ }
+ },
+ },
+ {
+ name: "double as float64",
+ fieldJSON:
`{"id":2,"name":"n","type":"double","required":false,"write-default":3.14}`,
+ check: func(t *testing.T, col arrow.Array) {
+ require.Equal(t, arrow.FLOAT64,
col.DataType().ID())
+ arr := col.(*array.Float64)
+ for i := 0; i < arr.Len(); i++ {
+ assert.InDelta(t, float64(3.14),
arr.Value(i), 0.0001, "row %d", i)
+ }
+ },
+ },
+ {
+ name: "string",
+ fieldJSON:
`{"id":2,"name":"s","type":"string","required":false,"write-default":"hello"}`,
+ check: func(t *testing.T, col arrow.Array) {
+ arr := col.(*array.String)
+ for i := 0; i < arr.Len(); i++ {
+ assert.Equal(t, "hello", arr.Value(i),
"row %d", i)
+ }
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ mem :=
memory.NewCheckedAllocator(memory.NewGoAllocator())
+ defer mem.AssertSize(t, 0)
+
+ field := unmarshalField(t, tt.fieldJSON)
+
+ requestedIceSchema := iceberg.NewSchema(0,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ field,
+ )
+
+ rec := buildBaseRecord(mem)
+ defer rec.Release()
+
+ result, err := table.ToRequestedSchema(ctx,
requestedIceSchema, fileIceSchema, rec, table.SchemaOptions{UseWriteDefault:
true})
+ require.NoError(t, err)
+ defer result.Release()
+
+ require.EqualValues(t, 2, result.NumCols())
+ tt.check(t, result.Column(1))
+ })
+ }
+}
diff --git a/table/metadata_schema_compatibility.go
b/table/metadata_schema_compatibility.go
index 7d58a29d..1d3f76ee 100644
--- a/table/metadata_schema_compatibility.go
+++ b/table/metadata_schema_compatibility.go
@@ -36,10 +36,10 @@ func (e ErrIncompatibleSchema) Error() string {
var problems strings.Builder
for _, f := range e.fields {
if f.UnsupportedType != nil {
- problems.WriteString(fmt.Sprintf("\n- invalid type for
%s: %s is not supported until v%d", f.ColName, f.Field.Type,
f.UnsupportedType.MinFormatVersion))
+ fmt.Fprintf(&problems, "\n- invalid type for %s: %s is
not supported until v%d", f.ColName, f.Field.Type,
f.UnsupportedType.MinFormatVersion)
}
if f.InvalidDefault != nil {
- problems.WriteString(fmt.Sprintf("\n- invalid initial
default for %s: non-null default (%v) is not supported until v%d", f.ColName,
f.Field.InitialDefault, f.InvalidDefault.MinFormatVersion))
+ fmt.Fprintf(&problems, "\n- invalid initial default for
%s: non-null default (%v) is not supported until v%d", f.ColName,
f.Field.InitialDefault, f.InvalidDefault.MinFormatVersion)
}
}
diff --git a/table/rolling_data_writer.go b/table/rolling_data_writer.go
index 2477705e..5462e0e3 100644
--- a/table/rolling_data_writer.go
+++ b/table/rolling_data_writer.go
@@ -332,7 +332,7 @@ func (r *RollingDataWriter) stream(outputDataFilesCh chan<-
iceberg.DataFile) {
}
converted, err := ToRequestedSchema(r.ctx, r.factory.fileSchema,
- r.factory.taskSchema, record, false, true, false)
+ r.factory.taskSchema, record,
SchemaOptions{IncludeFieldIDs: true, UseWriteDefault: true})
if err != nil {
record.Release()
r.sendError(err)
diff --git a/table/writer.go b/table/writer.go
index 32e39dbe..92b437fa 100644
--- a/table/writer.go
+++ b/table/writer.go
@@ -118,7 +118,7 @@ func (w *defaultDataFileWriter) writeFile(ctx
context.Context, partitionValues m
batches := make([]arrow.RecordBatch, len(task.Batches))
for i, b := range task.Batches {
rec, err := ToRequestedSchema(ctx, w.fileSchema,
- task.Schema, b, false, true, false)
+ task.Schema, b, SchemaOptions{IncludeFieldIDs: true,
UseWriteDefault: true})
if err != nil {
return nil, err
}