This is an automated email from the ASF dual-hosted git repository.
laskoviymishka pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git
The following commit(s) were added to refs/heads/main by this push:
new 1a446112 feat(table): project reserved row-lineage fields as null when
the file lacks them (#1045)
1a446112 is described below
commit 1a446112ccb7a3453853ab5c5f10c7aa24cfb945
Author: David Zhao <[email protected]>
AuthorDate: Mon May 18 23:36:57 2026 +0200
feat(table): project reserved row-lineage fields as null when the file
lacks them (#1045)
fixes #1010.
---------
Signed-off-by: happydave1 <[email protected]>
---
schema_test.go | 36 ++++
table/scanner.go | 95 +++++++++++
table/scanner_internal_test.go | 369 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 500 insertions(+)
diff --git a/schema_test.go b/schema_test.go
index 0ae676b3..b0919ede 100644
--- a/schema_test.go
+++ b/schema_test.go
@@ -1035,3 +1035,39 @@ func TestSanitizeColumnNamesEmptyFieldName(t *testing.T)
{
assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
assert.ErrorContains(t, err, "field name cannot be empty")
}
+
+func TestSchemaSelectCaseSensitiveSuccess(t *testing.T) {
+ selected, err := tableSchemaSimple.Select(true, "foo", "bar")
+ require.NoError(t, err)
+
+ expected := iceberg.NewSchemaWithIdentifiers(1, []int{2},
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+ assert.Truef(t, selected.Equals(expected), "expected: %s\ngot: %s",
expected, selected)
+}
+
+func TestSchemaSelectCaseSensitiveNameMismatch(t *testing.T) {
+ _, err := tableSchemaSimple.Select(true, "FOO")
+ require.Error(t, err)
+ assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+ assert.ErrorContains(t, err, "could not find column FOO")
+}
+
+func TestSchemaSelectCaseInsensitiveSuccess(t *testing.T) {
+ selected, err := tableSchemaSimple.Select(false, "FOO", "BaR")
+ require.NoError(t, err)
+
+ expected := iceberg.NewSchemaWithIdentifiers(1, []int{2},
+ iceberg.NestedField{ID: 1, Name: "foo", Type:
iceberg.PrimitiveTypes.String},
+ iceberg.NestedField{ID: 2, Name: "bar", Type:
iceberg.PrimitiveTypes.Int32, Required: true},
+ )
+ assert.Truef(t, selected.Equals(expected), "expected: %s\ngot: %s",
expected, selected)
+}
+
+func TestSchemaSelectCaseInsensitiveMissingColumn(t *testing.T) {
+ _, err := tableSchemaSimple.Select(false, "missing_col")
+ require.Error(t, err)
+ assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+ assert.ErrorContains(t, err, "could not find column missing_col")
+}
diff --git a/table/scanner.go b/table/scanner.go
index 7263c028..b5a9b15d 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -24,6 +24,7 @@ import (
"iter"
"math"
"slices"
+ "strings"
"sync"
"github.com/apache/arrow-go/v18/arrow"
@@ -241,6 +242,8 @@ func (scan *Scan) Snapshot() *Snapshot {
func (scan *Scan) Projection() (*iceberg.Schema, error) {
curSchema := scan.metadata.CurrentSchema()
+ curVersion := scan.metadata.Version()
+ caseSensitive := scan.caseSensitive
if scan.snapshotID != nil {
snap := scan.metadata.SnapshotByID(*scan.snapshotID)
if snap == nil {
@@ -262,6 +265,21 @@ func (scan *Scan) Projection() (*iceberg.Schema, error) {
return curSchema, nil
}
+ selectedFieldsMeta := metaFieldsFromSelectedFields(scan.selectedFields,
caseSensitive)
+ schemaMeta := metaFieldsFromSchema(curSchema)
+ synthesisMeta := synthesizeMeta(selectedFieldsMeta, schemaMeta)
+ if len(synthesisMeta) > 0 && curVersion >= minFormatVersionRowLineage {
+
+ // synthesis path
+ removedMetaSlice, missingMetaFields :=
removeMetadataFromSelectedFields(scan.selectedFields, synthesisMeta)
+ sch, err := curSchema.Select(scan.caseSensitive,
removedMetaSlice...)
+ if err != nil {
+ return nil, err
+ }
+
+ return iceberg.NewSchemaWithIdentifiers(sch.ID,
sch.IdentifierFieldIDs, append(sch.Fields(), missingMetaFields...)...), nil
+ }
+
return curSchema.Select(scan.caseSensitive, scan.selectedFields...)
}
@@ -674,3 +692,80 @@ func (scan *Scan) ToArrowTable(ctx context.Context)
(arrow.Table, error) {
return array.NewTableFromRecords(schema, records), nil
}
+
+// Removes metaFields from selectedField if it exists. Returns a []string
representing the filtered selectedFields
+// and an iceberg.NestedField[] representing the removed metadata. Note that
metaFields is passed in
+// after being validated from metaFieldsFromSelectedFields.
+func removeMetadataFromSelectedFields(selectedFields []string, metaFields
[]string) ([]string, []iceberg.NestedField) {
+ filteredFields := []string{}
+ meta := []iceberg.NestedField{}
+
+ for _, field := range selectedFields {
+ if slices.Contains(metaFields, strings.ToLower(field)) {
+
+ switch strings.ToLower(field) {
+ case iceberg.LastUpdatedSequenceNumberColumnName:
+ meta = append(meta,
iceberg.LastUpdatedSequenceNumber())
+ case iceberg.RowIDColumnName:
+ meta = append(meta, iceberg.RowID())
+ }
+
+ continue
+ }
+
+ filteredFields = append(filteredFields, field)
+ }
+
+ return filteredFields, meta
+}
+
+func metaFieldsFromSelectedFields(selectedFields []string, caseSensitive bool)
[]string {
+ meta := []string{}
+ if !caseSensitive {
+ for _, field := range selectedFields {
+ if strings.EqualFold(field, iceberg.RowIDColumnName) ||
strings.EqualFold(field, iceberg.LastUpdatedSequenceNumberColumnName) {
+ meta = append(meta, strings.ToLower(field))
+ }
+ }
+
+ return meta
+ }
+
+ for _, field := range selectedFields {
+ if field == iceberg.RowIDColumnName || field ==
iceberg.LastUpdatedSequenceNumberColumnName {
+ meta = append(meta, strings.ToLower(field))
+ }
+ }
+
+ return meta
+}
+
+// Takes in a *iceberg.Schema and returns a []string representing the row
lineage metadata present
+// in the schema.
+func metaFieldsFromSchema(sch *iceberg.Schema) []string {
+ meta := []string{}
+ _, hasRowIDMeta := sch.FindFieldByName(iceberg.RowIDColumnName)
+ _, hasSeqMeta :=
sch.FindFieldByName(iceberg.LastUpdatedSequenceNumberColumnName)
+
+ if hasRowIDMeta {
+ meta = append(meta, iceberg.RowIDColumnName)
+ }
+ if hasSeqMeta {
+ meta = append(meta, iceberg.LastUpdatedSequenceNumberColumnName)
+ }
+
+ return meta
+}
+
+// Any metadata which is in selectedFieldsMeta and not in schemaMeta is a
synthesis meta
+func synthesizeMeta(selectedFieldsMeta []string, schemaMeta []string) []string
{
+ synthesis := []string{}
+
+ for _, f := range selectedFieldsMeta {
+ if !slices.Contains(schemaMeta, f) {
+ synthesis = append(synthesis, f)
+ }
+ }
+
+ return synthesis
+}
diff --git a/table/scanner_internal_test.go b/table/scanner_internal_test.go
index d5808728..789d9906 100644
--- a/table/scanner_internal_test.go
+++ b/table/scanner_internal_test.go
@@ -19,6 +19,8 @@ package table
import (
"runtime"
+ "slices"
+ "strconv"
"sync"
"sync/atomic"
"testing"
@@ -236,6 +238,330 @@ func TestBuildPartitionEvaluatorWithInvalidSpecID(t
*testing.T) {
assert.ErrorContains(t, err, "id 999")
}
+// TestProjectionV3PreLineageFile verifies that Projection() succeeds and
returns
+// _row_id and _last_updated_sequence_number as nullable (all-null-capable)
fields when
+// the table is v3 with next-row-id set but the data file predates row lineage
(those
+// columns are absent from the schema).
+func TestProjectionV3PreLineageFile(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ // Request the two user columns plus both row-lineage metadata columns.
+ // These metadata columns do NOT exist in the physical schema of a
pre-lineage file.
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{"id", "payload",
iceberg.RowIDColumnName, iceberg.LastUpdatedSequenceNumberColumnName},
+ caseSensitive: true,
+ }
+
+ proj, err := scan.Projection()
+ require.NoError(t, err, "Projection must not error for pre-lineage
metadata columns")
+ require.NotNil(t, proj)
+
+ fields := proj.Fields()
+ require.Len(t, fields, 4, "projected schema must contain all four
requested fields")
+
+ fieldByName := make(map[string]iceberg.NestedField, len(fields))
+ for _, f := range fields {
+ fieldByName[f.Name] = f
+ }
+
+ // Regular columns must survive unchanged.
+ idField, ok := fieldByName["id"]
+ require.True(t, ok, "id must be in projection")
+ assert.Equal(t, 1, idField.ID)
+
+ payloadField, ok := fieldByName["payload"]
+ require.True(t, ok, "payload must be in projection")
+ assert.Equal(t, 2, payloadField.ID)
+
+ // Row lineage columns must be present as optional (nullable) fields —
the scanner
+ // will return all-nulls for any data file that was written before row
lineage existed.
+ rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+ require.True(t, ok, "_row_id must be in projection")
+ assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
+ assert.False(t, rowIDField.Required, "_row_id must be optional
(nullable) for pre-lineage files")
+
+ seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+ require.True(t, ok, "_last_updated_sequence_number must be in
projection")
+ assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID,
"_last_updated_sequence_number field ID")
+ assert.False(t, seqField.Required, "_last_updated_sequence_number must
be optional (nullable) for pre-lineage files")
+}
+
+func TestProjectionV3PreLineageFileCaseSensitive(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{"id", "payload", "_Row_Id"},
+ caseSensitive: true,
+ }
+
+ _, err = scan.Projection()
+ require.Error(t, err)
+ require.ErrorContains(t, err, "could not find column _Row_Id")
+}
+
+func TestProjectionV3PreLineageFileCaseInsensitive(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{"id", "payload", "_Row_Id",
"_Last_Updated_SEQUENCE_number"},
+ caseSensitive: false,
+ }
+
+ proj, err := scan.Projection()
+ require.NoError(t, err)
+ require.NotNil(t, proj)
+
+ fields := proj.Fields()
+ require.Len(t, fields, 4, "projected schema must contain all four
requested fields")
+
+ fieldByName := make(map[string]iceberg.NestedField, len(fields))
+ for _, f := range fields {
+ fieldByName[f.Name] = f
+ }
+
+ idField, ok := fieldByName["id"]
+ require.True(t, ok, "id must be in projection")
+ assert.Equal(t, 1, idField.ID)
+
+ payloadField, ok := fieldByName["payload"]
+ require.True(t, ok, "payload must be in projection")
+ assert.Equal(t, 2, payloadField.ID)
+
+ rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+ require.True(t, ok, "_row_id must be in projection")
+ assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID, "_row_id field ID")
+ assert.False(t, rowIDField.Required, "_row_id must be optional
(nullable) for pre-lineage files")
+
+ seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+ require.True(t, ok, "_last_updated_sequence_number must be in
projection")
+ assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID,
"_last_updated_sequence_number field ID")
+ assert.False(t, seqField.Required, "_last_updated_sequence_number must
be optional (nullable) for pre-lineage files")
+}
+
+// TestProjectionV2RowLineage asserts that requesting row-lineage metadata
columns on a v1 or v2
+// table does not use the v3-only synthesis path
+func TestProjectionV2RowLineage(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ )
+
+ for _, tc := range []struct {
+ name string
+ ver int
+ }{
+ {name: "v1", ver: 1},
+ {name: "v2", ver: 2},
+ } {
+ t.Run(tc.name, func(t *testing.T) {
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{PropertyFormatVersion:
strconv.Itoa(tc.ver)},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, tc.ver, metadata.Version(), "sanity:
metadata format version")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{"id",
iceberg.RowIDColumnName},
+ caseSensitive: true,
+ }
+
+ _, err = scan.Projection()
+ require.Error(t, err)
+ assert.ErrorIs(t, err, iceberg.ErrInvalidSchema)
+ assert.ErrorContains(t, err, iceberg.RowIDColumnName)
+ })
+ }
+}
+
+// TestProjectionV3SchemaWithRowIDOnly covers a v3 table whose schema
+// already declares _row_id (reserved field id) but does not declare
_last_updated_sequence_number.
+func TestProjectionV3SchemaWithRowIDOnly(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ iceberg.RowID(),
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{
+ "id", "payload",
+ iceberg.RowIDColumnName,
+ iceberg.LastUpdatedSequenceNumberColumnName,
+ },
+ caseSensitive: true,
+ }
+
+ var proj *iceberg.Schema
+ require.NotPanics(t, func() {
+ var perr error
+ proj, perr = scan.Projection()
+ require.NoError(t, perr)
+ })
+ require.NotNil(t, proj)
+
+ fields := proj.Fields()
+ require.Len(t, fields, 4, "projection must include id, payload,
_row_id, _last_updated_sequence_number")
+
+ fieldByName := make(map[string]iceberg.NestedField, len(fields))
+ idsSeen := make(map[int]string, len(fields))
+ for _, f := range fields {
+ if prev, dup := idsSeen[f.ID]; dup {
+ t.Fatalf("duplicate field id %d: %q and %q", f.ID,
prev, f.Name)
+ }
+ idsSeen[f.ID] = f.Name
+ fieldByName[f.Name] = f
+ }
+
+ idField, ok := fieldByName["id"]
+ require.True(t, ok)
+ assert.Equal(t, 1, idField.ID)
+
+ payloadField, ok := fieldByName["payload"]
+ require.True(t, ok)
+ assert.Equal(t, 2, payloadField.ID)
+
+ rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+ require.True(t, ok)
+ assert.NotEqual(t, iceberg.RowIDFieldID, rowIDField.ID) // NewMetadata
reorders schema field numbers
+ assert.False(t, rowIDField.Required)
+
+ seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+ require.True(t, ok)
+ assert.Equal(t, iceberg.LastUpdatedSequenceNumberFieldID, seqField.ID)
+ assert.False(t, seqField.Required)
+}
+
+func TestProjectionV3SchemaWithLastUpdatedSequenceNumberOnly(t *testing.T) {
+ schema := iceberg.NewSchema(
+ 1,
+ iceberg.NestedField{ID: 1, Name: "id", Type:
iceberg.PrimitiveTypes.Int64, Required: true},
+ iceberg.NestedField{ID: 2, Name: "payload", Type:
iceberg.PrimitiveTypes.String, Required: false},
+ iceberg.LastUpdatedSequenceNumber(),
+ )
+
+ metadata, err := NewMetadata(
+ schema,
+ iceberg.UnpartitionedSpec,
+ UnsortedSortOrder,
+ "s3://test-bucket/test_table",
+ iceberg.Properties{"format-version": "3"},
+ )
+ require.NoError(t, err)
+ assert.Equal(t, 3, metadata.Version(), "sanity: must be v3")
+
+ scan := &Scan{
+ metadata: metadata,
+ selectedFields: []string{
+ "id", "payload",
+ iceberg.RowIDColumnName,
+ iceberg.LastUpdatedSequenceNumberColumnName,
+ },
+ caseSensitive: true,
+ }
+
+ var proj *iceberg.Schema
+ require.NotPanics(t, func() {
+ var perr error
+ proj, perr = scan.Projection()
+ require.NoError(t, perr)
+ })
+ require.NotNil(t, proj)
+
+ fields := proj.Fields()
+ require.Len(t, fields, 4, "projection must include id, payload,
_row_id, _last_updated_sequence_number")
+
+ fieldByName := make(map[string]iceberg.NestedField, len(fields))
+ idsSeen := make(map[int]string, len(fields))
+ for _, f := range fields {
+ if prev, dup := idsSeen[f.ID]; dup {
+ t.Fatalf("duplicate field id %d: %q and %q", f.ID,
prev, f.Name)
+ }
+ idsSeen[f.ID] = f.Name
+ fieldByName[f.Name] = f
+ }
+
+ idField, ok := fieldByName["id"]
+ require.True(t, ok)
+ assert.Equal(t, 1, idField.ID)
+
+ payloadField, ok := fieldByName["payload"]
+ require.True(t, ok)
+ assert.Equal(t, 2, payloadField.ID)
+
+ rowIDField, ok := fieldByName[iceberg.RowIDColumnName]
+ require.True(t, ok)
+ assert.Equal(t, iceberg.RowIDFieldID, rowIDField.ID)
+ assert.False(t, rowIDField.Required)
+
+ seqField, ok := fieldByName[iceberg.LastUpdatedSequenceNumberColumnName]
+ require.True(t, ok)
+ assert.NotEqual(t, iceberg.LastUpdatedSequenceNumberFieldID,
seqField.ID) // NewMetadata reorders schema field numbers
+ assert.False(t, seqField.Required)
+}
+
// TestSynthesizeRowLineageColumns verifies that _row_id and
_last_updated_sequence_number
// are filled from task constants when those columns are present and null.
func TestSynthesizeRowLineageColumns(t *testing.T) {
@@ -295,3 +621,46 @@ func TestSynthesizeRowLineageColumns(t *testing.T) {
}
assert.EqualValues(t, 3, rowOffset)
}
+
+func TestRemoveMetadataFromSelectedFields(t *testing.T) {
+ selectedFields := []string{
+ "id",
+ "payload",
+ }
+
+ metaFields := []string{
+ "_row_id",
+ }
+
+ sf, mf := removeMetadataFromSelectedFields(selectedFields, metaFields)
+
+ assert.Equal(t, 2, len(sf))
+ assert.Equal(t, 0, len(mf))
+
+ assert.True(t, slices.Contains(sf, "id"))
+ assert.True(t, slices.Contains(sf, "payload"))
+}
+
+func TestRemoveMetadataFromSelectedFieldsCasing(t *testing.T) {
+ selectedFields := []string{
+ "id",
+ "payload",
+ "_ROW_Id",
+ "lastupdatedsequence_number",
+ }
+
+ metaFields := []string{
+ "_row_id",
+ "_last_updated_sequence_number",
+ }
+
+ sf, mf := removeMetadataFromSelectedFields(selectedFields, metaFields)
+
+ assert.Equal(t, 3, len(sf))
+ assert.Equal(t, 1, len(mf))
+
+ assert.True(t, slices.Contains(sf, "id"))
+ assert.True(t, slices.Contains(sf, "payload"))
+ assert.True(t, slices.Contains(sf, "lastupdatedsequence_number"))
+ assert.True(t, slices.Contains(mf, iceberg.RowID()))
+}