This is an automated email from the ASF dual-hosted git repository.

zeroshade pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 3c01a046 feat(table): add Scan.ReadTasks for reading pre-planned file 
scan tasks (#781)
3c01a046 is described below

commit 3c01a04602af53cb77e18c479aa3adfb654f9209
Author: Tobias Pütz <[email protected]>
AuthorDate: Tue Mar 17 17:56:56 2026 +0100

    feat(table): add Scan.ReadTasks for reading pre-planned file scan tasks 
(#781)
    
    This extracts the read logic from ToArrowRecords as a shared readTasks
    method and exposes it as ReadTasks. This enables callers that already
    have FileScanTasks to read them without re-planning.
    
    Currently ToArrowRecords always goes through PlanFiles first. If you've
    already selected or filtered your tasks (e.g. for compaction, selective
    re-reads, or custom merge strategies), there's no way of using them
    directly.
---
 table/scanner.go      | 13 ++++++++-
 table/scanner_test.go | 80 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 92 insertions(+), 1 deletion(-)

diff --git a/table/scanner.go b/table/scanner.go
index bc3c0902..0d1a0dbd 100644
--- a/table/scanner.go
+++ b/table/scanner.go
@@ -505,7 +505,18 @@ func (scan *Scan) ToArrowRecords(ctx context.Context) 
(*arrow.Schema, iter.Seq2[
                return nil, nil, err
        }
 
-       var boundFilter iceberg.BooleanExpression
+       return scan.ReadTasks(ctx, tasks)
+}
+
+// ReadTasks reads Arrow records from a specific set of FileScanTasks, 
applying the
+// scan's projection, row filters, and positional delete handling. This is 
useful when
+// the caller has already planned or selected specific tasks to read.
+func (scan *Scan) ReadTasks(ctx context.Context, tasks []FileScanTask) 
(*arrow.Schema, iter.Seq2[arrow.RecordBatch, error], error) {
+       var (
+               boundFilter iceberg.BooleanExpression
+               err         error
+       )
+
        if scan.rowFilter != nil {
                boundFilter, err = 
iceberg.BindExpr(scan.metadata.CurrentSchema(), scan.rowFilter, 
scan.caseSensitive)
                if err != nil {
diff --git a/table/scanner_test.go b/table/scanner_test.go
index 6554c134..11c8e205 100644
--- a/table/scanner_test.go
+++ b/table/scanner_test.go
@@ -319,6 +319,86 @@ func (s *ScannerSuite) TestScannerRecordsDeletes() {
        }
 }
 
+func (s *ScannerSuite) TestReadTasks() {
+       mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
+       defer mem.AssertSize(s.T(), 0)
+
+       ident := catalog.ToIdentifier("default", "test_positional_mor_deletes")
+
+       tbl, err := s.cat.LoadTable(s.ctx, ident)
+       s.Require().NoError(err)
+
+       expectedSchema := arrow.NewSchema([]arrow.Field{
+               {Name: "number", Type: arrow.PrimitiveTypes.Int32, Nullable: 
true},
+       }, nil)
+
+       ref := iceberg.Reference("letter")
+
+       tests := []struct {
+               name     string
+               filter   iceberg.BooleanExpression
+               rowLimit int64
+               expected string
+       }{
+               {
+                       "all",
+                       iceberg.AlwaysTrue{},
+                       table.ScanNoLimit,
+                       `[1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]`,
+               },
+               {"filter", iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+                       iceberg.LessThan(ref, "k")), table.ScanNoLimit, `[5, 6, 
7, 8, 10]`},
+               {"filter and limit", 
iceberg.NewAnd(iceberg.GreaterThanEqual(ref, "e"),
+                       iceberg.LessThan(ref, "k")), 1, `[5]`},
+               {"limit", nil, 3, `[1, 2, 3]`},
+       }
+
+       for _, tt := range tests {
+               s.Run(tt.name, func() {
+                       scopedMem := memory.NewCheckedAllocatorScope(mem)
+                       defer scopedMem.CheckSize(s.T())
+
+                       ctx := compute.WithAllocator(s.ctx, mem)
+
+                       scan := tbl.Scan(table.WithRowFilter(tt.filter),
+                               table.WithSelectedFields("number"))
+                       tasks, err := scan.PlanFiles(ctx)
+                       s.Require().NoError(err)
+
+                       s.Len(tasks, 1)
+                       s.Len(tasks[0].DeleteFiles, 1)
+
+                       _, itr, err := 
scan.UseRowLimit(tt.rowLimit).ReadTasks(ctx, tasks)
+                       s.Require().NoError(err)
+
+                       next, stop := iter.Pull2(itr)
+                       defer stop()
+
+                       rec, err, valid := next()
+                       s.Require().True(valid)
+                       s.Require().NoError(err)
+                       defer rec.Release()
+
+                       s.True(expectedSchema.Equal(rec.Schema()), "expected: 
%s\ngot: %s\n",
+                               expectedSchema, rec.Schema())
+
+                       arr, _, err := array.FromJSON(mem, 
arrow.PrimitiveTypes.Int32,
+                               strings.NewReader(tt.expected))
+                       s.Require().NoError(err)
+                       defer arr.Release()
+
+                       expectedResult := array.NewRecord(expectedSchema, 
[]arrow.Array{arr}, int64(arr.Len()))
+                       defer expectedResult.Release()
+
+                       s.True(array.RecordEqual(expectedResult, rec), 
"expected: %s\ngot: %s\n", expectedResult, rec)
+
+                       _, err, valid = next()
+                       s.Require().NoError(err)
+                       s.Require().False(valid)
+               })
+       }
+}
+
 func (s *ScannerSuite) TestScannerRecordsDoubleDeletes() {
        // number, letter
        //  (1, 'a'),

Reply via email to