zeroshade commented on code in PR #735:
URL: https://github.com/apache/iceberg-go/pull/735#discussion_r3001934611
##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ nrows := batch.NumRows()
+
+ // Start from the existing columns; we'll replace the row lineage
columns in-place
+ // when we need to synthesize values.
+ newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ var toRelease []arrow.Array
+
+ // _row_id: inherit first_row_id + row_position when null; else keep
value from file.
+ if rowIDColIdx >= 0 && task.FirstRowID != nil {
+ if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset + k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+
+ arr := bldr.NewArray()
+ newCols[rowIDColIdx] = arr
+ toRelease = append(toRelease, arr)
+ }
+ }
+
+ // _last_updated_sequence_number: inherit file's data_sequence_number
when null; else keep value from file.
+ if seqNumColIdx >= 0 && task.DataSequenceNumber != nil {
+ if col, ok := newCols[seqNumColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
Review Comment:
as above, reuse the hoisted instance here
##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ nrows := batch.NumRows()
+
+ // Start from the existing columns; we'll replace the row lineage
columns in-place
+ // when we need to synthesize values.
+ newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ var toRelease []arrow.Array
+
+ // _row_id: inherit first_row_id + row_position when null; else keep
value from file.
+ if rowIDColIdx >= 0 && task.FirstRowID != nil {
+ if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
Review Comment:
We can reuse this builder. Hoist this out of the ifs so it looks more like:
```go
bldr := array.NewInt64Builder(alloc)
defer bldr.Release()
if rowIDColIdx >= 0 && task.FirstRowID != nil {
if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
...
}
}
...
```
##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ nrows := batch.NumRows()
+
+ // Start from the existing columns; we'll replace the row lineage
columns in-place
+ // when we need to synthesize values.
+ newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ var toRelease []arrow.Array
+
+ // _row_id: inherit first_row_id + row_position when null; else keep
value from file.
+ if rowIDColIdx >= 0 && task.FirstRowID != nil {
+ if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset + k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+
+ arr := bldr.NewArray()
+ newCols[rowIDColIdx] = arr
+ toRelease = append(toRelease, arr)
+ }
+ }
+
+ // _last_updated_sequence_number: inherit file's data_sequence_number
when null; else keep value from file.
+ if seqNumColIdx >= 0 && task.DataSequenceNumber != nil {
+ if col, ok := newCols[seqNumColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ seq := *task.DataSequenceNumber
+ for k := int64(0); k < nrows; k++ {
Review Comment:
```suggestion
for k := range nrows {
```
##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ nrows := batch.NumRows()
+
+ // Start from the existing columns; we'll replace the row lineage
columns in-place
+ // when we need to synthesize values.
+ newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ var toRelease []arrow.Array
Review Comment:
instead of this, just immediately `defer arr.Release()` after adding them to
`newCols`
##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ nrows := batch.NumRows()
+
+ // Start from the existing columns; we'll replace the row lineage
columns in-place
+ // when we need to synthesize values.
+ newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ var toRelease []arrow.Array
+
+ // _row_id: inherit first_row_id + row_position when null; else keep
value from file.
+ if rowIDColIdx >= 0 && task.FirstRowID != nil {
+ if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset + k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+
+ arr := bldr.NewArray()
+ newCols[rowIDColIdx] = arr
+ toRelease = append(toRelease, arr)
+ }
+ }
+
+ // _last_updated_sequence_number: inherit file's data_sequence_number
when null; else keep value from file.
+ if seqNumColIdx >= 0 && task.DataSequenceNumber != nil {
+ if col, ok := newCols[seqNumColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ seq := *task.DataSequenceNumber
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(seq)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+
+ arr := bldr.NewArray()
+ newCols[seqNumColIdx] = arr
+ toRelease = append(toRelease, arr)
Review Comment:
```suggestion
defer arr.Release()
```
##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ nrows := batch.NumRows()
+
+ // Start from the existing columns; we'll replace the row lineage
columns in-place
+ // when we need to synthesize values.
+ newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ var toRelease []arrow.Array
+
+ // _row_id: inherit first_row_id + row_position when null; else keep
value from file.
+ if rowIDColIdx >= 0 && task.FirstRowID != nil {
+ if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
Review Comment:
```suggestion
for k := range nrows {
```
##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ nrows := batch.NumRows()
+
+ // Start from the existing columns; we'll replace the row lineage
columns in-place
+ // when we need to synthesize values.
+ newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ var toRelease []arrow.Array
+
+ // _row_id: inherit first_row_id + row_position when null; else keep
value from file.
+ if rowIDColIdx >= 0 && task.FirstRowID != nil {
+ if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset + k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+
+ arr := bldr.NewArray()
+ newCols[rowIDColIdx] = arr
+ toRelease = append(toRelease, arr)
Review Comment:
```suggestion
defer arr.Release()
```
##########
table/arrow_scanner.go:
##########
@@ -385,6 +386,95 @@ func (as *arrowScan) getRecordFilter(ctx context.Context,
fileSchema *iceberg.Sc
return nil, false, nil
}
+// synthesizeRowLineageColumns fills _row_id and _last_updated_sequence_number
from task constants
+// when those columns are present in the batch (e.g. from ToRequestedSchema).
Per the Iceberg v3
+// row lineage spec: if the value is null in the file, it is inherited
(synthesized) from the file's
+// first_row_id and data_sequence_number; otherwise the value from the file is
kept.
+// rowOffset is the 0-based row index within the current file and is updated
so _row_id stays
+// correct across multiple batches from the same file (first_row_id +
row_position).
+func synthesizeRowLineageColumns(
+ ctx context.Context,
+ rowOffset *int64,
+ task FileScanTask,
+ batch arrow.RecordBatch,
+) (arrow.RecordBatch, error) {
+ alloc := compute.GetAllocator(ctx)
+ schema := batch.Schema()
+ nrows := batch.NumRows()
+
+ // Start from the existing columns; we'll replace the row lineage
columns in-place
+ // when we need to synthesize values.
+ newCols := append([]arrow.Array(nil), batch.Columns()...)
+
+ // Resolve column indices by name; -1 if not present.
+ rowIDIndices := schema.FieldIndices(iceberg.RowIDColumnName)
+ seqNumIndices :=
schema.FieldIndices(iceberg.LastUpdatedSequenceNumberColumnName)
+ rowIDColIdx := -1
+ if len(rowIDIndices) > 0 {
+ rowIDColIdx = rowIDIndices[0]
+ }
+ seqNumColIdx := -1
+ if len(seqNumIndices) > 0 {
+ seqNumColIdx = seqNumIndices[0]
+ }
+
+ var toRelease []arrow.Array
+
+ // _row_id: inherit first_row_id + row_position when null; else keep
value from file.
+ if rowIDColIdx >= 0 && task.FirstRowID != nil {
+ if col, ok := newCols[rowIDColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ first := *task.FirstRowID
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(first + *rowOffset + k)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+
+ arr := bldr.NewArray()
+ newCols[rowIDColIdx] = arr
+ toRelease = append(toRelease, arr)
+ }
+ }
+
+ // _last_updated_sequence_number: inherit file's data_sequence_number
when null; else keep value from file.
+ if seqNumColIdx >= 0 && task.DataSequenceNumber != nil {
+ if col, ok := newCols[seqNumColIdx].(*array.Int64); ok {
+ bldr := array.NewInt64Builder(alloc)
+ defer bldr.Release()
+
+ bldr.Reserve(int(nrows))
+ seq := *task.DataSequenceNumber
+ for k := int64(0); k < nrows; k++ {
+ if col.IsNull(int(k)) {
+ bldr.Append(seq)
+ } else {
+ bldr.Append(col.Value(int(k)))
+ }
+ }
+
+ arr := bldr.NewArray()
+ newCols[seqNumColIdx] = arr
+ toRelease = append(toRelease, arr)
+ }
+ }
+
+ // Advance so the next batch from this file uses the correct row
position for _row_id.
+ *rowOffset += nrows
+
+ rec := array.NewRecordBatch(schema, newCols, nrows)
+ for _, c := range toRelease {
+ c.Release()
+ }
Review Comment:
don't need this anymore if you just immediately defer release at the point
where you call `NewArray`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]