This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 3d2537cd feat(data): add MOR file scan task reader (#657)
3d2537cd is described below

commit 3d2537cd3bebf3c9d5d72961e42e749cc3d42f5e
Author: Gang Wu <[email protected]>
AuthorDate: Fri May 22 15:20:33 2026 +0800

    feat(data): add MOR file scan task reader (#657)
    
    Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for
    file scan tasks, covering both no-delete reader passthrough and
    merge-on-read filtering with position and equality deletes.
    
    Introduce reusable Arrow C Data utilities for stream wrapping and batch
    projection, including ProjectionContext caching and optional Arrow
    compute registration via arrow::RegisterAll. Move Arrow IO registration
    into arrow_register and remove FileScanTask::ToArrow.
    
    Add coverage for projection behavior across nanoarrow and Arrow compute
    paths, plus end-to-end FileScanTaskReader tests for projected reads,
    position deletes, equality deletes, dropped equality fields, and fully
    deleted batches.
---
 src/iceberg/CMakeLists.txt                         |   5 +-
 src/iceberg/arrow/arrow_c_data_util.cc             | 131 ++++++
 .../{arrow_io_register.cc => arrow_register.cc}    |  52 +-
 .../{arrow_io_register.h => arrow_register.h}      |   8 +-
 src/iceberg/arrow_c_data_util.cc                   | 397 ++++++++++++++++
 src/iceberg/arrow_c_data_util_internal.h           | 245 ++++++++++
 src/iceberg/data/delete_filter.cc                  |   6 +-
 src/iceberg/data/file_scan_task_reader.cc          | 235 +++++++++
 src/iceberg/data/file_scan_task_reader.h           |  81 ++++
 src/iceberg/data/meson.build                       |   1 +
 src/iceberg/file_reader.h                          |   2 +-
 src/iceberg/meson.build                            |   2 +
 src/iceberg/table_scan.cc                          | 118 -----
 src/iceberg/table_scan.h                           |  11 -
 src/iceberg/test/CMakeLists.txt                    |   4 +-
 src/iceberg/test/arrow_c_data_util_test.cc         | 194 ++++++++
 src/iceberg/test/file_scan_task_reader_test.cc     | 524 +++++++++++++++++++++
 src/iceberg/test/file_scan_task_test.cc            |  27 +-
 18 files changed, 1877 insertions(+), 166 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 68cacebe..2b02b999 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -18,6 +18,7 @@
 set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
                      "$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
 set(ICEBERG_SOURCES
+    arrow_c_data_util.cc
     arrow_c_data_guard_internal.cc
     catalog/memory/in_memory_catalog.cc
     delete_file_index.cc
@@ -166,6 +167,7 @@ set(ICEBERG_DATA_SOURCES
     data/delete_filter.cc
     data/delete_loader.cc
     data/equality_delete_writer.cc
+    data/file_scan_task_reader.cc
     data/position_delete_writer.cc
     data/writer.cc
     deletes/position_delete_index.cc
@@ -222,9 +224,10 @@ add_subdirectory(util)
 
 if(ICEBERG_BUILD_BUNDLE)
   set(ICEBERG_BUNDLE_SOURCES
+      arrow/arrow_c_data_util.cc
       arrow/arrow_io.cc
       arrow/s3/arrow_s3_file_io.cc
-      arrow/arrow_io_register.cc
+      arrow/arrow_register.cc
       arrow/metadata_column_util.cc
       avro/avro_data_util.cc
       avro/avro_direct_decoder.cc
diff --git a/src/iceberg/arrow/arrow_c_data_util.cc 
b/src/iceberg/arrow/arrow_c_data_util.cc
new file mode 100644
index 00000000..ea52d64c
--- /dev/null
+++ b/src/iceberg/arrow/arrow_c_data_util.cc
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <span>
+#include <utility>
+#include <vector>
+
+#include <arrow/array/array_primitive.h>
+#include <arrow/buffer.h>
+#include <arrow/c/bridge.h>
+#include <arrow/compute/api_vector.h>
+#include <arrow/record_batch.h>
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+struct ArrowProjectBatchState {
+  std::shared_ptr<::arrow::Schema> input_schema;
+  std::shared_ptr<::arrow::Schema> output_schema;
+};
+
+Result<std::shared_ptr<::arrow::Schema>> ImportArrowSchema(
+    const ArrowSchema& arrow_schema) {
+  ArrowSchema schema_copy;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowSchemaDeepCopy(&arrow_schema, 
&schema_copy));
+  internal::ArrowSchemaGuard schema_copy_guard(&schema_copy);
+
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(auto schema, 
::arrow::ImportSchema(&schema_copy));
+  return schema;
+}
+
+Result<std::shared_ptr<ArrowProjectBatchState>> GetArrowProjectBatchState(
+    ProjectionContext& projection) {
+  auto state =
+      
std::static_pointer_cast<ArrowProjectBatchState>(projection.project_batch_state());
+  if (state != nullptr) {
+    return state;
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto input_schema,
+                          ImportArrowSchema(projection.input_arrow_schema()));
+  ICEBERG_ASSIGN_OR_RAISE(auto output_schema,
+                          ImportArrowSchema(projection.output_arrow_schema()));
+
+  state = std::make_shared<ArrowProjectBatchState>(
+      ArrowProjectBatchState{.input_schema = std::move(input_schema),
+                             .output_schema = std::move(output_schema)});
+  projection.project_batch_state() = state;
+  return state;
+}
+
+Result<ArrowArray> ProjectBatchArrowCompute(ArrowArray* input_batch,
+                                            std::span<const int32_t> 
row_indices,
+                                            ProjectionContext& projection) {
+  ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
+  ICEBERG_ASSIGN_OR_RAISE(auto state, GetArrowProjectBatchState(projection));
+
+  ICEBERG_ARROW_ASSIGN_OR_RETURN(
+      auto input_record_batch,
+      ::arrow::ImportRecordBatch(input_batch, state->input_schema));
+
+  const int32_t empty_index = 0;
+  // Buffer::Wrap needs a valid pointer even when the zero-length buffer is 
never read.
+  const int32_t* row_indices_data =
+      row_indices.empty() ? &empty_index : row_indices.data();
+  auto index_array = std::make_shared<::arrow::Int32Array>(
+      static_cast<int64_t>(row_indices.size()),
+      ::arrow::Buffer::Wrap(row_indices_data, row_indices.size()));
+
+  std::vector<std::shared_ptr<::arrow::Array>> output_columns;
+  output_columns.reserve(projection.selected_field_indices().size());
+  for (int32_t input_index : projection.selected_field_indices()) {
+    ICEBERG_PRECHECK(input_index >= 0 && input_index < 
input_record_batch->num_columns(),
+                     "Input field index {} out of range for batch with {} 
columns",
+                     input_index, input_record_batch->num_columns());
+    ICEBERG_ARROW_ASSIGN_OR_RETURN(
+        auto taken_column,
+        ::arrow::compute::Take(*input_record_batch->column(input_index), 
*index_array));
+    output_columns.push_back(std::move(taken_column));
+  }
+
+  auto output_record_batch = ::arrow::RecordBatch::Make(
+      state->output_schema, static_cast<int64_t>(row_indices.size()),
+      std::move(output_columns));
+
+  ArrowArray output_array;
+  ICEBERG_ARROW_RETURN_NOT_OK(
+      ::arrow::ExportRecordBatch(*output_record_batch, &output_array));
+  internal::ArrowArrayGuard output_array_guard(&output_array);
+
+  return std::exchange(output_array, ArrowArray{});
+}
+
+}  // namespace
+
+void RegisterArrowProjectBatch() {
+  static std::once_flag flag;
+  std::call_once(flag, []() {
+    ProjectionContext::RegisterProjectBatchFunction(&ProjectBatchArrowCompute);
+  });
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/arrow/arrow_io_register.cc 
b/src/iceberg/arrow/arrow_register.cc
similarity index 50%
rename from src/iceberg/arrow/arrow_io_register.cc
rename to src/iceberg/arrow/arrow_register.cc
index 43273c0a..d2983ffa 100644
--- a/src/iceberg/arrow/arrow_io_register.cc
+++ b/src/iceberg/arrow/arrow_register.cc
@@ -1,28 +1,35 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "iceberg/arrow/arrow_io_register.h"
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/arrow/arrow_register.h"
 
 #include <mutex>
 #include <string>
+#include <unordered_map>
 
 #include "iceberg/arrow/arrow_io_util.h"
 #include "iceberg/file_io_registry.h"
 
+namespace iceberg {
+void RegisterArrowProjectBatch();
+}
+
 namespace iceberg::arrow {
 
 namespace {
@@ -43,8 +50,6 @@ void RegisterS3FileIO() {
 #endif
 }
 
-}  // namespace
-
 void EnsureArrowFileIOsRegistered() {
   static std::once_flag flag;
   std::call_once(flag, []() {
@@ -58,4 +63,11 @@ void EnsureArrowFileIOsRegistered() {
   return true;
 }();
 
+}  // namespace
+
+void RegisterAll() {
+  EnsureArrowFileIOsRegistered();
+  ::iceberg::RegisterArrowProjectBatch();
+}
+
 }  // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_io_register.h 
b/src/iceberg/arrow/arrow_register.h
similarity index 79%
rename from src/iceberg/arrow/arrow_io_register.h
rename to src/iceberg/arrow/arrow_register.h
index f28b7a56..7a67be51 100644
--- a/src/iceberg/arrow/arrow_io_register.h
+++ b/src/iceberg/arrow/arrow_register.h
@@ -19,16 +19,16 @@
 
 #pragma once
 
-/// \file iceberg/arrow/arrow_io_register.h
-/// \brief Provide functions to register Arrow FileIO implementations.
+/// \file iceberg/arrow/arrow_register.h
+/// \brief Provide functions to register Arrow bundle integrations.
 
 #include "iceberg/iceberg_bundle_export.h"
 
 namespace iceberg::arrow {
 
-/// \brief Register built-in Arrow FileIO implementations into the 
FileIORegistry.
+/// \brief Register Arrow FileIOs and Arrow-backed C Data utilities.
 ///
 /// This operation is idempotent and safe to call multiple times.
-ICEBERG_BUNDLE_EXPORT void EnsureArrowFileIOsRegistered();
+ICEBERG_BUNDLE_EXPORT void RegisterAll();
 
 }  // namespace iceberg::arrow
diff --git a/src/iceberg/arrow_c_data_util.cc b/src/iceberg/arrow_c_data_util.cc
new file mode 100644
index 00000000..a1d765d2
--- /dev/null
+++ b/src/iceberg/arrow_c_data_util.cc
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <span>
+#include <utility>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
+  return MakeArrowArrayStream<Reader>(std::move(reader));
+}
+
+namespace {
+
+Result<size_t> FindFieldIndexById(std::span<const SchemaField> fields, int32_t 
field_id) {
+  for (size_t index = 0; index < fields.size(); ++index) {
+    if (fields[index].field_id() == field_id) {
+      return index;
+    }
+  }
+  return InvalidArgument("Required schema does not contain projected field id 
{}",
+                         field_id);
+}
+
+std::mutex g_project_batch_function_mutex;
+ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr;
+
+ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() {
+  std::lock_guard lock(g_project_batch_function_mutex);
+  return g_project_batch_function;
+}
+
+Result<std::vector<int32_t>> BuildSelectedFieldIndices(
+    std::span<const SchemaField> input_fields,
+    std::span<const SchemaField> output_fields) {
+  std::vector<int32_t> selected_field_indices;
+  selected_field_indices.reserve(output_fields.size());
+
+  for (const auto& output_field : output_fields) {
+    ICEBERG_ASSIGN_OR_RAISE(auto input_index,
+                            FindFieldIndexById(input_fields, 
output_field.field_id()));
+    const auto& input_field = input_fields[input_index];
+    if (*input_field.type() != *output_field.type()) {
+      return InvalidArgument(
+          "ProjectBatch only supports complete top-level fields, but field id "
+          "{} changes type from {} to {}",
+          output_field.field_id(), input_field.type()->ToString(),
+          output_field.type()->ToString());
+    }
+    ICEBERG_PRECHECK(
+        input_index <= 
static_cast<size_t>(std::numeric_limits<int32_t>::max()),
+        "Input field index {} exceeds int32 range", input_index);
+    selected_field_indices.push_back(static_cast<int32_t>(input_index));
+  }
+
+  return selected_field_indices;
+}
+
+Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                   const ArrowArrayView& input_view, int64_t row_index,
+                   ArrowArray* output_array);
+
+Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                        const ArrowArrayView& input_view, int64_t row_index,
+                        ArrowArray* output_array) {
+  const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index);
+  const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 
1);
+  for (int64_t element_index = begin; element_index < end; ++element_index) {
+    ICEBERG_RETURN_UNEXPECTED(
+        AppendValue(*input_schema.children[0], *input_array.children[0],
+                    *input_view.children[0], element_index, 
output_array->children[0]));
+  }
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+  return {};
+}
+
+Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                       const ArrowArrayView& input_view, int64_t row_index,
+                       ArrowArray* output_array) {
+  const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index);
+  const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index + 
1);
+  for (int64_t entry_index = begin; entry_index < end; ++entry_index) {
+    ICEBERG_RETURN_UNEXPECTED(
+        AppendValue(*input_schema.children[0], *input_array.children[0],
+                    *input_view.children[0], entry_index, 
output_array->children[0]));
+  }
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+  return {};
+}
+
+Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView& 
input_view,
+                     int64_t row_index, ArrowArray* output_array) {
+  ArrowError error;
+  ArrowSchemaView schema_view;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowSchemaViewInit(&schema_view, &input_schema, &error), error);
+
+  ArrowDecimal value;
+  ArrowDecimalInit(&value, schema_view.decimal_bitwidth, 
schema_view.decimal_precision,
+                   schema_view.decimal_scale);
+  ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array, 
&value));
+  return {};
+}
+
+Status AppendValue(const ArrowSchema& input_schema, const ArrowArray& 
input_array,
+                   const ArrowArrayView& input_view, int64_t row_index,
+                   ArrowArray* output_array) {
+  if (ArrowArrayViewIsNull(&input_view, row_index)) {
+    ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1));
+    return {};
+  }
+
+  switch (input_view.storage_type) {
+    case NANOARROW_TYPE_NA:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 
1));
+      return {};
+    case NANOARROW_TYPE_BOOL:
+    case NANOARROW_TYPE_INT8:
+    case NANOARROW_TYPE_INT16:
+    case NANOARROW_TYPE_INT32:
+    case NANOARROW_TYPE_INT64:
+    case NANOARROW_TYPE_DATE32:
+    case NANOARROW_TYPE_DATE64:
+    case NANOARROW_TYPE_TIME32:
+    case NANOARROW_TYPE_TIME64:
+    case NANOARROW_TYPE_TIMESTAMP:
+    case NANOARROW_TYPE_DURATION:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(
+          output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index)));
+      return {};
+    case NANOARROW_TYPE_UINT8:
+    case NANOARROW_TYPE_UINT16:
+    case NANOARROW_TYPE_UINT32:
+    case NANOARROW_TYPE_UINT64:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt(
+          output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index)));
+      return {};
+    case NANOARROW_TYPE_HALF_FLOAT:
+    case NANOARROW_TYPE_FLOAT:
+    case NANOARROW_TYPE_DOUBLE:
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble(
+          output_array, ArrowArrayViewGetDoubleUnsafe(&input_view, 
row_index)));
+      return {};
+    case NANOARROW_TYPE_STRING:
+    case NANOARROW_TYPE_LARGE_STRING:
+    case NANOARROW_TYPE_STRING_VIEW: {
+      auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index);
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array, 
value));
+      return {};
+    }
+    case NANOARROW_TYPE_BINARY:
+    case NANOARROW_TYPE_LARGE_BINARY:
+    case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+    case NANOARROW_TYPE_BINARY_VIEW: {
+      auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index);
+      ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array, 
value));
+      return {};
+    }
+    case NANOARROW_TYPE_DECIMAL128:
+    case NANOARROW_TYPE_DECIMAL256:
+      return AppendDecimal(input_schema, input_view, row_index, output_array);
+    case NANOARROW_TYPE_STRUCT: {
+      for (int64_t child_index = 0; child_index < input_schema.n_children;
+           ++child_index) {
+        ICEBERG_RETURN_UNEXPECTED(AppendValue(
+            *input_schema.children[child_index], 
*input_array.children[child_index],
+            *input_view.children[child_index], row_index,
+            output_array->children[child_index]));
+      }
+      
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+      return {};
+    }
+    case NANOARROW_TYPE_LIST:
+    case NANOARROW_TYPE_LARGE_LIST:
+    case NANOARROW_TYPE_FIXED_SIZE_LIST:
+      return AppendListValues(input_schema, input_array, input_view, row_index,
+                              output_array);
+    case NANOARROW_TYPE_MAP:
+      return AppendMapValues(input_schema, input_array, input_view, row_index,
+                             output_array);
+    default:
+      return NotImplemented("Unsupported Arrow type for merge-on-read 
projection: {}",
+                            static_cast<int>(input_view.storage_type));
+  }
+}
+
+}  // namespace
+
+ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept
+    : input_schema_(std::exchange(other.input_schema_, nullptr)),
+      output_schema_(std::exchange(other.output_schema_, nullptr)),
+      selected_field_indices_(std::move(other.selected_field_indices_)),
+      input_arrow_schema_(other.input_arrow_schema_),
+      output_arrow_schema_(other.output_arrow_schema_),
+      project_batch_function_(std::exchange(other.project_batch_function_, 
nullptr)),
+      project_batch_state_(std::move(other.project_batch_state_)) {
+  other.input_arrow_schema_.release = nullptr;
+  other.output_arrow_schema_.release = nullptr;
+}
+
+ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other) 
noexcept {
+  if (this == &other) {
+    return *this;
+  }
+
+  if (input_arrow_schema_.release != nullptr) {
+    input_arrow_schema_.release(&input_arrow_schema_);
+  }
+  if (output_arrow_schema_.release != nullptr) {
+    output_arrow_schema_.release(&output_arrow_schema_);
+  }
+
+  input_schema_ = std::exchange(other.input_schema_, nullptr);
+  output_schema_ = std::exchange(other.output_schema_, nullptr);
+  selected_field_indices_ = std::move(other.selected_field_indices_);
+  input_arrow_schema_ = other.input_arrow_schema_;
+  other.input_arrow_schema_.release = nullptr;
+  output_arrow_schema_ = other.output_arrow_schema_;
+  other.output_arrow_schema_.release = nullptr;
+  project_batch_function_ = std::exchange(other.project_batch_function_, 
nullptr);
+  project_batch_state_ = std::move(other.project_batch_state_);
+  return *this;
+}
+
+ProjectionContext::~ProjectionContext() {
+  if (input_arrow_schema_.release != nullptr) {
+    input_arrow_schema_.release(&input_arrow_schema_);
+  }
+  if (output_arrow_schema_.release != nullptr) {
+    output_arrow_schema_.release(&output_arrow_schema_);
+  }
+}
+
+Result<ProjectionContext> ProjectionContext::Make(
+    const Schema& input_schema, const Schema& output_schema,
+    ProjectionContext::ProjectBatchFunction project_batch_function) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto selected_field_indices,
+      BuildSelectedFieldIndices(input_schema.fields(), 
output_schema.fields()));
+
+  ProjectionContext context;
+  context.input_schema_ = &input_schema;
+  context.output_schema_ = &output_schema;
+  context.selected_field_indices_ = std::move(selected_field_indices);
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema, 
&context.input_arrow_schema_));
+  ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema, 
&context.output_arrow_schema_));
+  context.project_batch_function_ = project_batch_function;
+
+  return context;
+}
+
+const Schema& ProjectionContext::input_schema() const { return *input_schema_; 
}
+
+const Schema& ProjectionContext::output_schema() const { return 
*output_schema_; }
+
+const ArrowSchema& ProjectionContext::input_arrow_schema() const {
+  return input_arrow_schema_;
+}
+
+const ArrowSchema& ProjectionContext::output_arrow_schema() const {
+  return output_arrow_schema_;
+}
+
+std::span<const int32_t> ProjectionContext::selected_field_indices() const {
+  return selected_field_indices_;
+}
+
+ProjectionContext::ProjectBatchFunction 
ProjectionContext::project_batch_function()
+    const {
+  return project_batch_function_;
+}
+
+ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state() 
{
+  return project_batch_state_;
+}
+
+void ProjectionContext::RegisterProjectBatchFunction(
+    ProjectionContext::ProjectBatchFunction project_batch_function) {
+  ICEBERG_DCHECK(project_batch_function != nullptr,
+                 "ProjectBatch implementation must not be null");
+  if (project_batch_function == nullptr) {
+    return;
+  }
+  std::lock_guard lock(g_project_batch_function_mutex);
+  g_project_batch_function = project_batch_function;
+}
+
+bool ProjectionContext::HasProjectBatchFunction() {
+  return GetProjectBatchFunction() != nullptr;
+}
+
+auto ProjectionContext::ResolveProjectBatchFunction()
+    -> ProjectionContext::ProjectBatchFunction {
+  return GetProjectBatchFunction();
+}
+
+namespace {
+
+Result<ArrowArray> ProjectBatchNanoarrow(
+    const ArrowSchema& input_arrow_schema, const ArrowArray& input_batch,
+    std::span<const int32_t> row_indices, const ArrowSchema& 
output_arrow_schema,
+    std::span<const int32_t> selected_field_indices) {
+  ArrowArrayView input_view;
+  ArrowError error;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewInitFromSchema(&input_view, &input_arrow_schema, &error), 
error);
+  internal::ArrowArrayViewGuard input_view_guard(&input_view);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewSetArray(&input_view, &input_batch, &error), error);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_DEFAULT, 
&error),
+      error);
+
+  ArrowArray output_array;
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayInitFromSchema(&output_array, &output_arrow_schema, &error), 
error);
+  internal::ArrowArrayGuard output_array_guard(&output_array);
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&output_array));
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED(
+      ArrowArrayReserve(&output_array, 
static_cast<int64_t>(row_indices.size())));
+
+  for (int64_t row_index : row_indices) {
+    ICEBERG_PRECHECK(row_index >= 0 && row_index < input_batch.length,
+                     "Row index {} out of range for batch length {}", 
row_index,
+                     input_batch.length);
+    for (size_t output_index = 0; output_index < selected_field_indices.size();
+         ++output_index) {
+      const int32_t input_index = selected_field_indices[output_index];
+      
ICEBERG_RETURN_UNEXPECTED(AppendValue(*input_arrow_schema.children[input_index],
+                                            *input_batch.children[input_index],
+                                            *input_view.children[input_index], 
row_index,
+                                            
output_array.children[output_index]));
+    }
+    
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&output_array));
+  }
+
+  ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+      ArrowArrayFinishBuildingDefault(&output_array, &error), error);
+
+  return std::exchange(output_array, ArrowArray{});
+}
+
+}  // namespace
+
+Result<ArrowArray> ProjectBatch(ArrowArray* input_batch,
+                                std::span<const int32_t> row_indices,
+                                ProjectionContext& projection) {
+  ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
+  internal::ArrowArrayGuard input_batch_guard(input_batch);
+
+  auto project_batch_function = projection.project_batch_function();
+  if (project_batch_function != nullptr) {
+    // ProjectBatch owns input_batch. Arrow-backed implementations import it 
and clear
+    // release, so input_batch_guard becomes a no-op instead of 
double-releasing.
+    return project_batch_function(input_batch, row_indices, projection);
+  }
+
+  return ProjectBatchNanoarrow(projection.input_arrow_schema(), *input_batch, 
row_indices,
+                               projection.output_arrow_schema(),
+                               projection.selected_field_indices());
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/arrow_c_data_util_internal.h 
b/src/iceberg/arrow_c_data_util_internal.h
new file mode 100644
index 00000000..e02db29a
--- /dev/null
+++ b/src/iceberg/arrow_c_data_util_internal.h
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cerrno>
+#include <concepts>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <optional>
+#include <span>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Cached state for ProjectBatch over one input/output schema pair.
+///
+/// Exported because this internal utility is shared across library 
translation units.
+class ICEBERG_EXPORT ProjectionContext {
+ public:
+  using ProjectBatchState = std::shared_ptr<void>;
+
+  using ProjectBatchFunction = auto (*)(ArrowArray* input_batch,
+                                        std::span<const int32_t> row_indices,
+                                        ProjectionContext& projection)
+      -> Result<ArrowArray>;
+
+  /// \brief Register a custom implementation for ProjectBatch.
+  ///
+  /// Registration is process-wide. If multiple implementations are registered,
+  /// the last non-null implementation wins.
+  static void RegisterProjectBatchFunction(ProjectBatchFunction 
project_batch_function);
+
+  /// \brief Returns true when a custom implementation has been registered.
+  static bool HasProjectBatchFunction();
+
+  /// \brief Resolve the registered ProjectBatch implementation.
+  static auto ResolveProjectBatchFunction() -> ProjectBatchFunction;
+
+  /// \brief Build reusable projection state for a validated schema pair.
+  ///
+  /// \param input_schema Schema that describes every input batch.
+  /// \param output_schema Final schema and column order requested by the 
caller.
+  /// \param project_batch_function Optional implementation returned by
+  ///   ProjectionContext::ResolveProjectBatchFunction, or nullptr to use the 
nanoarrow
+  ///   path.
+  /// \note It validates that output_schema selects or reorders complete 
top-level fields
+  /// by field id. Nested pruning and type changes are rejected. The 
input_schema and
+  /// output_schema passed to Make must outlive the context. ProjectBatch may 
lazily
+  /// initialize backend cache; do not share one context across concurrent 
calls.
+  static Result<ProjectionContext> Make(const Schema& input_schema,
+                                        const Schema& output_schema,
+                                        ProjectBatchFunction 
project_batch_function);
+
+  ProjectionContext(ProjectionContext&&) noexcept;
+  ProjectionContext& operator=(ProjectionContext&&) noexcept;
+  ~ProjectionContext();
+
+  ProjectionContext(const ProjectionContext&) = delete;
+  ProjectionContext& operator=(const ProjectionContext&) = delete;
+
+  const Schema& input_schema() const;
+
+  const Schema& output_schema() const;
+
+  const ArrowSchema& input_arrow_schema() const;
+
+  const ArrowSchema& output_arrow_schema() const;
+
+  std::span<const int32_t> selected_field_indices() const;
+
+  ProjectBatchFunction project_batch_function() const;
+
+  ProjectBatchState& project_batch_state();
+
+ private:
+  ProjectionContext() = default;
+
+  // Raw schema pointers are borrowed from caller-owned schemas. 
FileScanTaskReader
+  // keeps those schema objects alive in the same stream source that owns this 
context.
+  const Schema* input_schema_ = nullptr;
+  const Schema* output_schema_ = nullptr;
+  std::vector<int32_t> selected_field_indices_;
+  ArrowSchema input_arrow_schema_{};
+  ArrowSchema output_arrow_schema_{};
+  ProjectBatchFunction project_batch_function_ = nullptr;
+  ProjectBatchState project_batch_state_;
+};
+
+/// \brief Concept for sources that can be wrapped as ArrowArrayStreams.
+template <typename Source>
+concept ArrowArrayStreamProvider = requires(Source& source) {
+  { source.Close() } -> std::same_as<Status>;
+  { source.Next() } -> std::same_as<Result<std::optional<ArrowArray>>>;
+  { source.Schema() } -> std::same_as<Result<ArrowSchema>>;
+};
+
+namespace detail {
+
+template <ArrowArrayStreamProvider Source>
+struct ArrowArrayStreamPrivateData {
+  std::unique_ptr<Source> source;
+  std::string last_error;
+
+  explicit ArrowArrayStreamPrivateData(std::unique_ptr<Source> src)
+      : source(std::move(src)) {}
+
+  ~ArrowArrayStreamPrivateData() {
+    if (source != nullptr) {
+      std::ignore = source->Close();
+    }
+  }
+};
+
+template <ArrowArrayStreamProvider Source>
+int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
+  if (stream == nullptr || stream->private_data == nullptr) {
+    return EINVAL;
+  }
+
+  auto* private_data =
+      static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
+  auto schema_result = private_data->source->Schema();
+  if (!schema_result.has_value()) {
+    private_data->last_error = schema_result.error().message;
+    std::memset(out, 0, sizeof(ArrowSchema));
+    return EIO;
+  }
+
+  *out = std::move(schema_result.value());
+  return 0;
+}
+
+template <ArrowArrayStreamProvider Source>
+int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
+  if (stream == nullptr || stream->private_data == nullptr) {
+    return EINVAL;
+  }
+
+  auto* private_data =
+      static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
+  auto next_result = private_data->source->Next();
+  if (!next_result.has_value()) {
+    private_data->last_error = next_result.error().message;
+    std::memset(out, 0, sizeof(ArrowArray));
+    return EIO;
+  }
+
+  auto& optional_array = next_result.value();
+  if (optional_array.has_value()) {
+    *out = std::move(optional_array.value());
+  } else {
+    std::memset(out, 0, sizeof(ArrowArray));
+  }
+
+  return 0;
+}
+
+template <ArrowArrayStreamProvider Source>
+const char* GetLastError(struct ArrowArrayStream* stream) {
+  if (stream == nullptr || stream->private_data == nullptr) {
+    return nullptr;
+  }
+
+  auto* private_data =
+      static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
+  return private_data->last_error.empty() ? nullptr : 
private_data->last_error.c_str();
+}
+
+template <ArrowArrayStreamProvider Source>
+void Release(struct ArrowArrayStream* stream) {
+  if (stream == nullptr || stream->private_data == nullptr) {
+    return;
+  }
+
+  delete 
static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
+  stream->private_data = nullptr;
+  stream->release = nullptr;
+}
+
+}  // namespace detail
+
+/// \brief Wrap an object with Close, Next, and Schema as an ArrowArrayStream.
+template <ArrowArrayStreamProvider Source>
+Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Source> source) {
+  if (source == nullptr) {
+    return InvalidArgument("Cannot make ArrowArrayStream from null source");
+  }
+
+  auto private_data =
+      
std::make_unique<detail::ArrowArrayStreamPrivateData<Source>>(std::move(source));
+  ArrowArrayStream stream{.get_schema = detail::GetSchema<Source>,
+                          .get_next = detail::GetNext<Source>,
+                          .get_last_error = detail::GetLastError<Source>,
+                          .release = detail::Release<Source>,
+                          .private_data = private_data.release()};
+  return stream;
+}
+
+/// \brief Wrap a Reader as an ArrowArrayStream.
+ICEBERG_EXPORT Result<ArrowArrayStream> MakeArrowArrayStream(
+    std::unique_ptr<Reader> reader);
+
+/// \brief Project selected rows from a batch into complete top-level fields.
+///
+/// `input_batch` is consumed by this function. If the projection carries a 
registered
+/// implementation, the call is delegated to it; otherwise the built-in 
nanoarrow
+/// implementation is used. The projection must have been created for the 
stable schema
+/// pair that describes the input batch and requested output. This function 
does not
+/// revalidate schema compatibility on each batch.
+///
+/// \param input_batch Owned Arrow C Data batch to project.
+/// \param row_indices Zero-based row positions to copy from `input_batch`.
+/// \param projection Reusable schema/projection state created by
+///   ProjectionContext::Make.
+/// \return A newly owned ArrowArray matching `projection.output_schema()`.
+ICEBERG_EXPORT Result<ArrowArray> ProjectBatch(ArrowArray* input_batch,
+                                               std::span<const int32_t> 
row_indices,
+                                               ProjectionContext& projection);
+
+}  // namespace iceberg
diff --git a/src/iceberg/data/delete_filter.cc 
b/src/iceberg/data/delete_filter.cc
index 876d644e..5f21a32d 100644
--- a/src/iceberg/data/delete_filter.cc
+++ b/src/iceberg/data/delete_filter.cc
@@ -20,6 +20,7 @@
 #include "iceberg/data/delete_filter.h"
 
 #include <algorithm>
+#include <limits>
 #include <map>
 #include <optional>
 #include <set>
@@ -745,7 +746,10 @@ Result<AliveRowSelection> 
DeleteFilter::ComputeAliveRows(const ArrowSchema& batc
     return result;
   }
 
-  result.indices.reserve(batch.length);
+  ICEBERG_PRECHECK(
+      batch.length <= 
static_cast<int64_t>(std::numeric_limits<int32_t>::max()),
+      "Batch length {} exceeds int32_t row index capacity", batch.length);
+  result.indices.reserve(static_cast<size_t>(batch.length));
   ICEBERG_ASSIGN_OR_RAISE(auto row, ArrowArrayStructLike::Make(batch_schema, 
batch));
 
   for (int64_t i = 0; i < batch.length; ++i) {
diff --git a/src/iceberg/data/file_scan_task_reader.cc 
b/src/iceberg/data/file_scan_task_reader.cc
new file mode 100644
index 00000000..7076486a
--- /dev/null
+++ b/src/iceberg/data/file_scan_task_reader.cc
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/file_scan_task_reader.h"
+
+#include <algorithm>
+#include <memory>
+#include <optional>
+#include <utility>
+#include <vector>
+
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/data/delete_filter.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+ReaderOptions MakeReaderOptions(const DataFile& data_file, 
std::shared_ptr<FileIO> io,
+                                std::shared_ptr<Schema> projection,
+                                std::shared_ptr<Expression> filter,
+                                std::shared_ptr<NameMapping> name_mapping,
+                                ReaderProperties properties) {
+  return ReaderOptions{
+      .path = data_file.file_path,
+      .length = static_cast<size_t>(data_file.file_size_in_bytes),
+      .io = std::move(io),
+      .projection = std::move(projection),
+      .filter = std::move(filter),
+      .name_mapping = std::move(name_mapping),
+      .properties = std::move(properties),
+  };
+}
+
+class MergeOnReadStreamSource {
+ public:
+  MergeOnReadStreamSource(std::unique_ptr<Reader> reader,
+                          std::unique_ptr<DeleteFilter> delete_filter,
+                          std::shared_ptr<::iceberg::Schema> required_schema,
+                          std::shared_ptr<::iceberg::Schema> projected_schema,
+                          ProjectionContext projection_context)
+      : reader_(std::move(reader)),
+        delete_filter_(std::move(delete_filter)),
+        required_schema_(std::move(required_schema)),
+        projected_schema_(std::move(projected_schema)),
+        project_all_rows_(required_schema_->SameSchema(*projected_schema_)),
+        projection_context_(std::move(projection_context)) {}
+
+  ~MergeOnReadStreamSource() {
+    if (cached_schema_.has_value() && cached_schema_->release != nullptr) {
+      cached_schema_->release(&cached_schema_.value());
+    }
+  }
+
+  Status Close() {
+    if (reader_ == nullptr) {
+      return {};
+    }
+    return reader_->Close();
+  }
+
+  Result<std::optional<ArrowArray>> Next() {
+    if (!cached_schema_.has_value()) {
+      // File readers expose one stable Arrow schema for every batch in the 
stream.
+      ICEBERG_ASSIGN_OR_RAISE(cached_schema_, reader_->Schema());
+    }
+    ArrowSchema& input_arrow_schema = cached_schema_.value();
+
+    while (true) {
+      ICEBERG_ASSIGN_OR_RAISE(auto next_batch, reader_->Next());
+      if (!next_batch.has_value()) {
+        return std::nullopt;
+      }
+
+      ArrowArray input_batch = std::move(next_batch.value());
+      internal::ArrowArrayGuard input_batch_guard(&input_batch);
+
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto alive, delete_filter_->ComputeAliveRows(input_arrow_schema, 
input_batch));
+      if (alive.empty()) {
+        continue;
+      }
+
+      if (alive.alive_count() == input_batch.length && project_all_rows_) {
+        // Transfer ownership to the stream result; the local guard must not 
release it.
+        ArrowArray output_batch = input_batch;
+        input_batch.release = nullptr;
+        return output_batch;
+      }
+
+      return ProjectBatch(&input_batch, alive.indices, projection_context_);
+    }
+  }
+
+  Result<ArrowSchema> Schema() {
+    ArrowSchema schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*projected_schema_, &schema));
+    return schema;
+  }
+
+ private:
+  std::unique_ptr<Reader> reader_;
+  std::unique_ptr<DeleteFilter> delete_filter_;
+  std::shared_ptr<::iceberg::Schema> required_schema_;
+  std::shared_ptr<::iceberg::Schema> projected_schema_;
+  bool project_all_rows_ = false;
+  ProjectionContext projection_context_;
+  std::optional<ArrowSchema> cached_schema_;
+};
+
+}  // namespace
+
+class FileScanTaskReader::Impl {
+ public:
+  static Result<std::unique_ptr<Impl>> Make(Options options) {
+    ICEBERG_PRECHECK(options.io != nullptr, "FileIO must not be null");
+    ICEBERG_PRECHECK(options.table_schema != nullptr, "Table schema must not 
be null");
+    ICEBERG_PRECHECK(options.projected_schema != nullptr,
+                     "Projected schema must not be null");
+    for (const auto& schema : options.schemas) {
+      ICEBERG_PRECHECK(schema != nullptr, "Schema list must not contain null 
schemas");
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto field_lookup,
+        DeleteFilter::MakeFieldLookup(options.table_schema, options.schemas));
+    auto delete_counter = std::make_shared<DeleteCounter>();
+
+    return std::unique_ptr<Impl>(
+        new Impl(std::move(options), std::move(field_lookup), 
std::move(delete_counter)));
+  }
+
+  Result<ArrowArrayStream> Open(const FileScanTask& task) {
+    const auto& data_file = task.data_file();
+    ICEBERG_PRECHECK(data_file != nullptr, "Data file must not be null");
+    ICEBERG_PRECHECK(data_file->file_size_in_bytes >= 0,
+                     "Data file size must not be negative: {}",
+                     data_file->file_size_in_bytes);
+
+    if (task.delete_files().empty()) {
+      auto options =
+          MakeReaderOptions(*data_file, io_, projected_schema_, 
task.residual_filter(),
+                            name_mapping_, properties_);
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto reader, ReaderFactoryRegistry::Open(data_file->file_format, 
options));
+      return MakeArrowArrayStream(std::move(reader));
+    }
+
+    const bool has_position_deletes =
+        std::any_of(task.delete_files().begin(), task.delete_files().end(),
+                    [](const std::shared_ptr<DataFile>& f) {
+                      return f->content == DataFile::Content::kPositionDeletes;
+                    });
+
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto delete_filter,
+        DeleteFilter::Make(data_file->file_path, task.delete_files(), 
projected_schema_,
+                           io_, field_lookup_, has_position_deletes, 
delete_counter_));
+
+    auto required_schema = delete_filter->RequiredSchema();
+    auto project_batch_function = 
ProjectionContext::ResolveProjectBatchFunction();
+    // ProjectionContext borrows schemas that are kept in 
MergeOnReadStreamSource.
+    ICEBERG_ASSIGN_OR_RAISE(auto projection_context,
+                            ProjectionContext::Make(*required_schema, 
*projected_schema_,
+                                                    project_batch_function));
+
+    auto options = MakeReaderOptions(*data_file, io_, required_schema,
+                                     task.residual_filter(), name_mapping_, 
properties_);
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            
ReaderFactoryRegistry::Open(data_file->file_format, options));
+
+    auto mor_reader = std::make_unique<MergeOnReadStreamSource>(
+        std::move(reader), std::move(delete_filter), 
std::move(required_schema),
+        projected_schema_, std::move(projection_context));
+    return MakeArrowArrayStream(std::move(mor_reader));
+  }
+
+ private:
+  Impl(Options options, DeleteFilter::FieldLookup field_lookup,
+       std::shared_ptr<DeleteCounter> delete_counter)
+      : io_(std::move(options.io)),
+        schemas_(std::move(options.schemas)),
+        projected_schema_(std::move(options.projected_schema)),
+        name_mapping_(std::move(options.name_mapping)),
+        properties_(ReaderProperties::FromMap(options.properties)),
+        field_lookup_(std::move(field_lookup)),
+        delete_counter_(std::move(delete_counter)) {}
+
+  std::shared_ptr<FileIO> io_;
+  std::vector<std::shared_ptr<Schema>> schemas_;
+  std::shared_ptr<Schema> projected_schema_;
+  std::shared_ptr<NameMapping> name_mapping_;
+  ReaderProperties properties_;
+  DeleteFilter::FieldLookup field_lookup_;
+  std::shared_ptr<DeleteCounter> delete_counter_;
+};
+
+Result<std::unique_ptr<FileScanTaskReader>> FileScanTaskReader::Make(Options 
options) {
+  ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(std::move(options)));
+  return std::unique_ptr<FileScanTaskReader>(new 
FileScanTaskReader(std::move(impl)));
+}
+
+FileScanTaskReader::FileScanTaskReader(std::unique_ptr<Impl> impl)
+    : impl_(std::move(impl)) {}
+
+FileScanTaskReader::~FileScanTaskReader() = default;
+
+Result<ArrowArrayStream> FileScanTaskReader::Open(const FileScanTask& task) {
+  return impl_->Open(task);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/data/file_scan_task_reader.h 
b/src/iceberg/data/file_scan_task_reader.h
new file mode 100644
index 00000000..a71ef5f8
--- /dev/null
+++ b/src/iceberg/data/file_scan_task_reader.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/data/file_scan_task_reader.h
+/// Delete-aware FileScanTask reader for copy-on-write and merge-on-read paths.
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/iceberg_data_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Opens a FileScanTask as an ArrowArrayStream.
+///
+/// FileScanTaskReader chooses the copy-on-write path for tasks without 
deletes and
+/// the merge-on-read path for tasks with v2 position or equality deletes. The 
returned
+/// stream always exposes `projected_schema`.
+///
+/// TODO(gangwu): Add a mode that emits a `_deleted` column instead of 
filtering rows.
+/// TODO(gangwu): Use evaluator to apply residual expression filters.
+class ICEBERG_DATA_EXPORT FileScanTaskReader {
+ public:
+  /// \brief Options shared by all tasks opened by this reader.
+  struct Options {
+    /// FileIO instance for reading data and delete files.
+    std::shared_ptr<FileIO> io;
+    /// The table schema. Used as the primary field lookup for delete file 
resolution.
+    std::shared_ptr<Schema> table_schema;
+    /// Optional list of historical table schemas for field lookup.
+    std::vector<std::shared_ptr<Schema>> schemas;
+    /// The output schema for the returned ArrowArrayStream. Must be a
+    /// projection of table_schema.
+    std::shared_ptr<Schema> projected_schema;
+    /// Optional name mapping for files written without field IDs.
+    std::shared_ptr<NameMapping> name_mapping;
+    /// Format-specific or implementation-specific options for data readers.
+    std::unordered_map<std::string, std::string> properties;
+  };
+
+  /// \brief Create a reusable task reader from shared read context.
+  static Result<std::unique_ptr<FileScanTaskReader>> Make(Options options);
+
+  ~FileScanTaskReader();
+
+  /// \brief Open a task and return an Arrow C stream for its projected live 
rows.
+  Result<ArrowArrayStream> Open(const FileScanTask& task);
+
+  FileScanTaskReader(const FileScanTaskReader&) = delete;
+  FileScanTaskReader& operator=(const FileScanTaskReader&) = delete;
+
+ private:
+  class Impl;
+  explicit FileScanTaskReader(std::unique_ptr<Impl> impl);
+  std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/data/meson.build b/src/iceberg/data/meson.build
index f0877ec6..bbb26db2 100644
--- a/src/iceberg/data/meson.build
+++ b/src/iceberg/data/meson.build
@@ -21,6 +21,7 @@ install_headers(
         'delete_filter.h',
         'delete_loader.h',
         'equality_delete_writer.h',
+        'file_scan_task_reader.h',
         'position_delete_writer.h',
         'writer.h',
     ],
diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h
index c31d9b29..c76c1009 100644
--- a/src/iceberg/file_reader.h
+++ b/src/iceberg/file_reader.h
@@ -68,7 +68,7 @@ struct ICEBERG_EXPORT Split {
   size_t length;
 };
 
-class ReaderProperties : public ConfigBase<ReaderProperties> {
+class ICEBERG_EXPORT ReaderProperties : public ConfigBase<ReaderProperties> {
  public:
   template <typename T>
   using Entry = const ConfigBase<ReaderProperties>::Entry<T>;
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 03dc2447..69fe733b 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -41,6 +41,7 @@ configure_file(
 iceberg_include_dir = include_directories('..')
 iceberg_sources = files(
     'arrow_c_data_guard_internal.cc',
+    'arrow_c_data_util.cc',
     'catalog/memory/in_memory_catalog.cc',
     'delete_file_index.cc',
     'expression/aggregate.cc',
@@ -147,6 +148,7 @@ iceberg_data_sources = files(
     'data/delete_filter.cc',
     'data/delete_loader.cc',
     'data/equality_delete_writer.cc',
+    'data/file_scan_task_reader.cc',
     'data/position_delete_writer.cc',
     'data/writer.cc',
     'deletes/position_delete_index.cc',
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 71075d90..6881d34f 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -20,14 +20,11 @@
 #include "iceberg/table_scan.h"
 
 #include <cstdint>
-#include <cstring>
-#include <iterator>
 #include <utility>
 
 #include "iceberg/expression/binder.h"
 #include "iceberg/expression/expression.h"
 #include "iceberg/expression/residual_evaluator.h"
-#include "iceberg/file_reader.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_group.h"
 #include "iceberg/result.h"
@@ -61,103 +58,6 @@ const std::vector<std::string> kScanColumnsWithStats = [] {
   return cols;
 }();
 
-/// \brief Private data structure to hold the Reader and error state
-struct ReaderStreamPrivateData {
-  std::unique_ptr<Reader> reader;
-  std::string last_error;
-
-  explicit ReaderStreamPrivateData(std::unique_ptr<Reader> reader_ptr)
-      : reader(std::move(reader_ptr)) {}
-
-  ~ReaderStreamPrivateData() {
-    if (reader) {
-      std::ignore = reader->Close();
-    }
-  }
-};
-
-/// \brief Callback to get the stream schema
-static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) 
{
-  if (!stream || !stream->private_data) {
-    return EINVAL;
-  }
-  auto* private_data = 
static_cast<ReaderStreamPrivateData*>(stream->private_data);
-  // Get schema from reader
-  auto schema_result = private_data->reader->Schema();
-  if (!schema_result.has_value()) {
-    private_data->last_error = schema_result.error().message;
-    std::memset(out, 0, sizeof(ArrowSchema));
-    return EIO;
-  }
-
-  *out = std::move(schema_result.value());
-  return 0;
-}
-
-/// \brief Callback to get the next array from the stream
-static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
-  if (!stream || !stream->private_data) {
-    return EINVAL;
-  }
-
-  auto* private_data = 
static_cast<ReaderStreamPrivateData*>(stream->private_data);
-
-  auto next_result = private_data->reader->Next();
-  if (!next_result.has_value()) {
-    private_data->last_error = next_result.error().message;
-    std::memset(out, 0, sizeof(ArrowArray));
-    return EIO;
-  }
-
-  auto& optional_array = next_result.value();
-  if (optional_array.has_value()) {
-    *out = std::move(optional_array.value());
-  } else {
-    // End of stream - set release to nullptr to signal end
-    std::memset(out, 0, sizeof(ArrowArray));
-    out->release = nullptr;
-  }
-
-  return 0;
-}
-
-/// \brief Callback to get the last error message
-static const char* GetLastError(struct ArrowArrayStream* stream) {
-  if (!stream || !stream->private_data) {
-    return nullptr;
-  }
-
-  auto* private_data = 
static_cast<ReaderStreamPrivateData*>(stream->private_data);
-  return private_data->last_error.empty() ? nullptr : 
private_data->last_error.c_str();
-}
-
-/// \brief Callback to release the stream resources
-static void Release(struct ArrowArrayStream* stream) {
-  if (!stream || !stream->private_data) {
-    return;
-  }
-
-  delete static_cast<ReaderStreamPrivateData*>(stream->private_data);
-  stream->private_data = nullptr;
-  stream->release = nullptr;
-}
-
-Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
-  if (!reader) {
-    return InvalidArgument("Reader cannot be null");
-  }
-
-  auto private_data = 
std::make_unique<ReaderStreamPrivateData>(std::move(reader));
-
-  ArrowArrayStream stream{.get_schema = GetSchema,
-                          .get_next = GetNext,
-                          .get_last_error = GetLastError,
-                          .release = Release,
-                          .private_data = private_data.release()};
-
-  return stream;
-}
-
 }  // namespace
 
 namespace internal {
@@ -280,24 +180,6 @@ int32_t FileScanTask::files_count() const { return 1; }
 
 int64_t FileScanTask::estimated_row_count() const { return 
data_file_->record_count; }
 
-Result<ArrowArrayStream> FileScanTask::ToArrow(
-    const std::shared_ptr<FileIO>& io, std::shared_ptr<Schema> 
projected_schema) const {
-  if (!delete_files_.empty()) {
-    return NotSupported("Reading data files with delete files is not yet 
supported.");
-  }
-
-  const ReaderOptions options{.path = data_file_->file_path,
-                              .length = data_file_->file_size_in_bytes,
-                              .io = io,
-                              .projection = std::move(projected_schema),
-                              .filter = residual_filter_};
-
-  ICEBERG_ASSIGN_OR_RAISE(auto reader,
-                          ReaderFactoryRegistry::Open(data_file_->file_format, 
options));
-
-  return MakeArrowArrayStream(std::move(reader));
-}
-
 // ChangelogScanTask implementation
 
 int64_t ChangelogScanTask::size_bytes() const {
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index 307946e6..64fb3ffd 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -27,7 +27,6 @@
 #include <unordered_set>
 #include <vector>
 
-#include "iceberg/arrow_c_data.h"
 #include "iceberg/iceberg_export.h"
 #include "iceberg/result.h"
 #include "iceberg/table_metadata.h"
@@ -87,16 +86,6 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
   int32_t files_count() const override;
   int64_t estimated_row_count() const override;
 
-  /// TODO(gangwu): move it to iceberg/data/task_scanner.h
-  ///
-  /// \brief Returns a C-ABI compatible ArrowArrayStream to read the data for 
this task.
-  ///
-  /// \param io The FileIO instance for accessing the file data.
-  /// \param projected_schema The projected schema for reading the data.
-  /// \return A Result containing an ArrowArrayStream, or an error on failure.
-  Result<ArrowArrayStream> ToArrow(const std::shared_ptr<FileIO>& io,
-                                   std::shared_ptr<Schema> projected_schema) 
const;
-
  private:
   std::shared_ptr<DataFile> data_file_;
   std::vector<std::shared_ptr<DataFile>> delete_files_;
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 37e66ed5..c632403c 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -222,9 +222,11 @@ if(ICEBERG_BUILD_BUNDLE)
   add_iceberg_test(data_test
                    USE_BUNDLE
                    SOURCES
+                   arrow_c_data_util_test.cc
                    data_writer_test.cc
                    delete_filter_test.cc
-                   delete_loader_test.cc)
+                   delete_loader_test.cc
+                   file_scan_task_reader_test.cc)
 
 endif()
 
diff --git a/src/iceberg/test/arrow_c_data_util_test.cc 
b/src/iceberg/test/arrow_c_data_util_test.cc
new file mode 100644
index 00000000..c93a928a
--- /dev/null
+++ b/src/iceberg/test/arrow_c_data_util_test.cc
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/// \file arrow_c_data_util_test.cc
+/// Verifies ProjectBatch behavior across registered implementations.
+
+#include <cstdint>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_register.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+
+namespace iceberg::internal {
+
+namespace {
+
+std::shared_ptr<::arrow::RecordBatch> MakeBatch(const Schema& schema,
+                                                std::string_view json) {
+  ArrowSchema c_schema;
+  EXPECT_THAT(ToArrowSchema(schema, &c_schema), IsOk());
+  // ImportSchema takes ownership of c_schema and calls release.
+  auto arrow_schema = ::arrow::ImportSchema(&c_schema).ValueOrDie();
+  auto struct_type = ::arrow::struct_(arrow_schema->fields());
+  return ::arrow::RecordBatch::FromStructArray(
+             ::arrow::json::ArrayFromJSONString(struct_type, std::string(json))
+                 .ValueOrDie())
+      .ValueOrDie();
+}
+
+ProjectionContext::ProjectBatchFunction ArrowComputeFunction() {
+  arrow::RegisterAll();
+  auto function = ProjectionContext::ResolveProjectBatchFunction();
+  EXPECT_NE(function, nullptr);
+  return function;
+}
+
+std::shared_ptr<::arrow::RecordBatch> RunProjectBatch(
+    const ::arrow::RecordBatch& batch, const std::vector<int32_t>& 
alive_indices,
+    const Schema& required_schema, const Schema& projected_schema,
+    ProjectionContext::ProjectBatchFunction project_batch_function) {
+  ArrowSchema c_schema;
+  ArrowArray c_array;
+  EXPECT_TRUE(::arrow::ExportRecordBatch(batch, &c_array, &c_schema).ok());
+  ArrowSchemaGuard schema_guard(&c_schema);
+  ArrowArrayGuard array_guard(&c_array);
+
+  auto projection =
+      ProjectionContext::Make(required_schema, projected_schema, 
project_batch_function);
+  EXPECT_THAT(projection, IsOk());
+
+  auto result = ProjectBatch(&c_array, alive_indices, projection.value());
+  EXPECT_THAT(result, IsOk());
+
+  ArrowSchema out_c_schema;
+  EXPECT_THAT(ToArrowSchema(projected_schema, &out_c_schema), IsOk());
+  auto arrow_out_schema = ::arrow::ImportSchema(&out_c_schema).ValueOrDie();
+
+  ArrowArray out_array = std::exchange(result.value(), ArrowArray{});
+  return ::arrow::ImportRecordBatch(&out_array, arrow_out_schema).ValueOrDie();
+}
+
+void ExpectProjectBatch(const ::arrow::RecordBatch& batch,
+                        const std::vector<int32_t>& alive_indices,
+                        const Schema& required_schema, const Schema& 
projected_schema,
+                        std::string_view expected_json) {
+  auto expected = MakeBatch(projected_schema, expected_json);
+  auto nanoarrow =
+      RunProjectBatch(batch, alive_indices, required_schema, projected_schema, 
nullptr);
+  auto arrow_compute = RunProjectBatch(batch, alive_indices, required_schema,
+                                       projected_schema, 
ArrowComputeFunction());
+
+  EXPECT_TRUE(nanoarrow->Equals(*expected)) << "nanoarrow:\n"
+                                            << nanoarrow->ToString() << 
"expected:\n"
+                                            << expected->ToString();
+  EXPECT_TRUE(arrow_compute->Equals(*expected))
+      << "arrow_compute:\n"
+      << arrow_compute->ToString() << "expected:\n"
+      << expected->ToString();
+  EXPECT_TRUE(nanoarrow->Equals(*arrow_compute))
+      << "nanoarrow:\n"
+      << nanoarrow->ToString() << "arrow_compute:\n"
+      << arrow_compute->ToString();
+}
+
+std::shared_ptr<Schema> MakeFullSchema() {
+  return std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(2, "name", string()),
+                               SchemaField::MakeOptional(3, "score", 
float64())});
+}
+
+}  // namespace
+
+TEST(ProjectBatchTest, ProjectSelectedRowsWithoutColumnProjection) {
+  auto schema = MakeFullSchema();
+  auto batch = MakeBatch(*schema, 
R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0],[4,"d",4.0]])");
+  std::vector<int32_t> alive = {0, 2};
+
+  ExpectProjectBatch(*batch, alive, *schema, *schema, 
R"([[1,"a",1.0],[3,"c",3.0]])");
+}
+
+TEST(ProjectBatchTest, ProjectColumnsWithoutRowFiltering) {
+  auto full_schema = MakeFullSchema();
+  auto projected = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(2, "name", 
string())});
+  auto batch = MakeBatch(*full_schema, 
R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0]])");
+  std::vector<int32_t> alive = {0, 1, 2};
+
+  ExpectProjectBatch(*batch, alive, *full_schema, *projected,
+                     R"([[1,"a"],[2,"b"],[3,"c"]])");
+}
+
+TEST(ProjectBatchTest, ProjectSelectedRowsAndReorderColumns) {
+  auto full_schema = MakeFullSchema();
+  // Reorder: score(3) before name(2), drop id(1).
+  auto projected = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeOptional(3, "score", 
float64()),
+                               SchemaField::MakeOptional(2, "name", 
string())});
+  auto batch = MakeBatch(*full_schema, 
R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0]])");
+  std::vector<int32_t> alive = {1, 2};
+
+  ExpectProjectBatch(*batch, alive, *full_schema, *projected, 
R"([[2.0,"b"],[3.0,"c"]])");
+}
+
+TEST(ProjectBatchTest, NullValues) {
+  auto schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(2, "name", 
string())});
+  auto batch = MakeBatch(*schema, R"([[1,null],[2,"b"],[3,null]])");
+  std::vector<int32_t> alive = {0, 2};
+
+  ExpectProjectBatch(*batch, alive, *schema, *schema, 
R"([[1,null],[3,null]])");
+}
+
+TEST(ProjectBatchTest, EmptyRowSelection) {
+  auto schema = MakeFullSchema();
+  auto batch = MakeBatch(*schema, R"([[1,"a",1.0],[2,"b",2.0]])");
+  std::vector<int32_t> alive = {};
+
+  ExpectProjectBatch(*batch, alive, *schema, *schema, R"([])");
+}
+
+TEST(ProjectBatchTest, ProjectionRejectsNestedPruning) {
+  auto input_schema = Schema(std::vector<SchemaField>{
+      SchemaField::MakeOptional(1, "person",
+                                
std::make_shared<StructType>(std::vector<SchemaField>{
+                                    SchemaField::MakeOptional(2, "name", 
string()),
+                                    SchemaField::MakeOptional(3, "age", 
int32()),
+                                })),
+  });
+  auto output_schema = Schema(std::vector<SchemaField>{
+      SchemaField::MakeOptional(1, "person",
+                                
std::make_shared<StructType>(std::vector<SchemaField>{
+                                    SchemaField::MakeOptional(2, "name", 
string()),
+                                })),
+  });
+
+  auto projection = ProjectionContext::Make(input_schema, output_schema, 
nullptr);
+
+  EXPECT_THAT(projection, IsError(ErrorKind::kInvalidArgument));
+}
+
+}  // namespace iceberg::internal
diff --git a/src/iceberg/test/file_scan_task_reader_test.cc 
b/src/iceberg/test/file_scan_task_reader_test.cc
new file mode 100644
index 00000000..1630a108
--- /dev/null
+++ b/src/iceberg/test/file_scan_task_reader_test.cc
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/file_scan_task_reader.h"
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <arrow/table.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/metadata.h>
+#include <parquet/properties.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_register.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/data/equality_delete_writer.h"
+#include "iceberg/data/position_delete_writer.h"
+#include "iceberg/file_format.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/temp_file_test_base.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+struct ExportedBatch {
+  ArrowSchema schema{};
+  ArrowArray array{};
+
+  ~ExportedBatch() {
+    if (array.release != nullptr) {
+      array.release(&array);
+    }
+    if (schema.release != nullptr) {
+      schema.release(&schema);
+    }
+  }
+
+  ExportedBatch() = default;
+  ExportedBatch(const ExportedBatch&) = delete;
+  ExportedBatch& operator=(const ExportedBatch&) = delete;
+
+  ExportedBatch(ExportedBatch&& other) noexcept
+      : schema(other.schema), array(other.array) {
+    other.schema.release = nullptr;
+    other.array.release = nullptr;
+  }
+  ExportedBatch& operator=(ExportedBatch&& other) noexcept = delete;
+};
+
+}  // namespace
+
+class FileScanTaskReaderTest : public TempFileTestBase {
+ protected:
+  static void SetUpTestSuite() {
+    arrow::RegisterAll();
+    parquet::RegisterAll();
+  }
+
+  void SetUp() override {
+    TempFileTestBase::SetUp();
+    file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO();
+    partition_spec_ = PartitionSpec::Unpartitioned();
+    table_schema_ = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                                 SchemaField::MakeOptional(2, "name", 
string()),
+                                 SchemaField::MakeOptional(3, "category", 
string())},
+        /*schema_id=*/2);
+    projected_schema_ = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                                 SchemaField::MakeOptional(2, "name", 
string())},
+        table_schema_->schema_id());
+  }
+
+  Result<ExportedBatch> MakeBatch(const Schema& schema,
+                                  const std::string& json_data) const {
+    ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(schema));
+    auto struct_type = ::arrow::struct_(arrow_schema->fields());
+    auto array_result = ::arrow::json::ArrayFromJSONString(struct_type, 
json_data);
+    if (!array_result.ok()) {
+      return UnknownError(array_result.status().ToString());
+    }
+
+    ExportedBatch batch;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &batch.schema));
+    auto export_status =
+        ::arrow::ExportArray(*array_result.MoveValueUnsafe(), &batch.array);
+    if (!export_status.ok()) {
+      return UnknownError(export_status.ToString());
+    }
+    return std::move(batch);
+  }
+
+  Result<std::shared_ptr<::arrow::Schema>> MakeArrowSchema(const Schema& 
schema) const {
+    ArrowSchema c_schema;
+    ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &c_schema));
+    auto arrow_schema_result = ::arrow::ImportSchema(&c_schema);
+    if (!arrow_schema_result.ok()) {
+      return UnknownError(arrow_schema_result.status().ToString());
+    }
+    return arrow_schema_result.MoveValueUnsafe();
+  }
+
+  Result<std::shared_ptr<::arrow::RecordBatch>> MakeRecordBatch(
+      const std::shared_ptr<::arrow::Schema>& arrow_schema,
+      const std::string& json_data) const {
+    auto struct_type = ::arrow::struct_(arrow_schema->fields());
+    auto array_result = ::arrow::json::ArrayFromJSONString(struct_type, 
json_data);
+    if (!array_result.ok()) {
+      return UnknownError(array_result.status().ToString());
+    }
+
+    auto batch_result =
+        ::arrow::RecordBatch::FromStructArray(array_result.MoveValueUnsafe());
+    if (!batch_result.ok()) {
+      return UnknownError(batch_result.status().ToString());
+    }
+    return batch_result.MoveValueUnsafe();
+  }
+
+  Result<std::string> CreateParquetDataFile(std::shared_ptr<Schema> schema,
+                                            const std::string& json_data,
+                                            int64_t row_group_size = 1024) {
+    auto path = CreateNewTempFilePathWithSuffix(".parquet");
+
+    ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(*schema));
+    ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeRecordBatch(arrow_schema, 
json_data));
+    auto table_result =
+        ::arrow::Table::FromRecordBatches(arrow_schema, {std::move(batch)});
+    if (!table_result.ok()) {
+      return UnknownError(table_result.status().ToString());
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto outfile, 
arrow::OpenArrowOutputStream(file_io_, path));
+    auto write_status = 
::parquet::arrow::WriteTable(*table_result.MoveValueUnsafe(),
+                                                     
::arrow::default_memory_pool(),
+                                                     outfile, row_group_size);
+    if (!write_status.ok()) {
+      return UnknownError(write_status.ToString());
+    }
+    if (auto close_status = outfile->Close(); !close_status.ok()) {
+      return UnknownError(close_status.ToString());
+    }
+    return path;
+  }
+
+  Result<std::string> CreateParquetDataFile(std::shared_ptr<Schema> schema,
+                                            const std::vector<std::string>& 
json_batches,
+                                            int64_t max_row_group_length) {
+    ICEBERG_PRECHECK(!json_batches.empty(), "Parquet data file must have a 
batch");
+
+    auto path = CreateNewTempFilePathWithSuffix(".parquet");
+    ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(*schema));
+    ICEBERG_ASSIGN_OR_RAISE(auto outfile, 
arrow::OpenArrowOutputStream(file_io_, path));
+
+    auto properties_builder = ::parquet::WriterProperties::Builder();
+    auto writer_properties =
+        properties_builder.compression(::parquet::Compression::UNCOMPRESSED)
+            ->max_row_group_length(max_row_group_length)
+            ->build();
+    auto writer_result = ::parquet::arrow::FileWriter::Open(
+        *arrow_schema, ::arrow::default_memory_pool(), outfile, 
writer_properties);
+    if (!writer_result.ok()) {
+      return UnknownError(writer_result.status().ToString());
+    }
+    auto writer = writer_result.MoveValueUnsafe();
+
+    for (const auto& json_batch : json_batches) {
+      ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeRecordBatch(arrow_schema, 
json_batch));
+      if (auto write_status = writer->WriteRecordBatch(*batch); 
!write_status.ok()) {
+        return UnknownError(write_status.ToString());
+      }
+    }
+    if (auto close_status = writer->Close(); !close_status.ok()) {
+      return UnknownError(close_status.ToString());
+    }
+    ICEBERG_PRECHECK(
+        writer->metadata()->num_row_groups() == 
static_cast<int>(json_batches.size()),
+        "Expected {} Parquet row groups, got {}", json_batches.size(),
+        writer->metadata()->num_row_groups());
+    if (auto close_status = outfile->Close(); !close_status.ok()) {
+      return UnknownError(close_status.ToString());
+    }
+    return path;
+  }
+
+  Result<std::shared_ptr<DataFile>> MakeDataFile(std::shared_ptr<Schema> 
schema,
+                                                 const std::string& json_data,
+                                                 int64_t record_count = 3) {
+    ICEBERG_ASSIGN_OR_RAISE(auto path,
+                            CreateParquetDataFile(std::move(schema), 
json_data));
+    ICEBERG_ASSIGN_OR_RAISE(auto input_file, file_io_->NewInputFile(path));
+    ICEBERG_ASSIGN_OR_RAISE(auto size, input_file->Size());
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kData,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .record_count = record_count,
+        .file_size_in_bytes = size,
+    });
+  }
+
+  Result<std::shared_ptr<DataFile>> MakeDataFile(
+      std::shared_ptr<Schema> schema, const std::vector<std::string>& 
json_batches,
+      int64_t record_count, int64_t max_row_group_length) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto path,
+        CreateParquetDataFile(std::move(schema), json_batches, 
max_row_group_length));
+    ICEBERG_ASSIGN_OR_RAISE(auto input_file, file_io_->NewInputFile(path));
+    ICEBERG_ASSIGN_OR_RAISE(auto size, input_file->Size());
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kData,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .record_count = record_count,
+        .file_size_in_bytes = size,
+    });
+  }
+
+  Result<std::shared_ptr<DataFile>> MakePositionDeleteFile(
+      const std::string& path, const std::vector<int64_t>& positions,
+      const std::string& data_path) {
+    PositionDeleteWriterOptions options{
+        .path = path,
+        .schema = table_schema_,
+        .spec = partition_spec_,
+        .partition = PartitionValues{},
+        .format = FileFormatType::kParquet,
+        .io = file_io_,
+        .flush_threshold = 10000,
+        .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer, PositionDeleteWriter::Make(options));
+    for (int64_t pos : positions) {
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteDelete(data_path, pos));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata());
+    return metadata.data_files[0];
+  }
+
+  Result<std::shared_ptr<DataFile>> MakeEqualityDeleteFile(
+      const std::string& path, std::shared_ptr<Schema> schema,
+      const std::string& json_data, std::vector<int32_t> equality_field_ids) {
+    EqualityDeleteWriterOptions options{
+        .path = path,
+        .schema = schema,
+        .spec = partition_spec_,
+        .partition = PartitionValues{},
+        .format = FileFormatType::kParquet,
+        .io = file_io_,
+        .equality_field_ids = std::move(equality_field_ids),
+        .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+    };
+
+    ICEBERG_ASSIGN_OR_RAISE(auto writer, EqualityDeleteWriter::Make(options));
+    ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeBatch(*schema, json_data));
+    ICEBERG_RETURN_UNEXPECTED(writer->Write(&batch.array));
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata());
+    return metadata.data_files[0];
+  }
+
+  void VerifyStream(struct ArrowArrayStream* stream, std::string_view 
expected_json) {
+    auto record_batch_reader = 
::arrow::ImportRecordBatchReader(stream).ValueOrDie();
+
+    auto result = record_batch_reader->Next();
+    ASSERT_TRUE(result.ok()) << result.status().message();
+    auto actual_batch = result.ValueOrDie();
+    ASSERT_NE(actual_batch, nullptr) << "Stream is exhausted but expected more 
data.";
+
+    auto struct_type = ::arrow::struct_(actual_batch->schema()->fields());
+    auto expected_array =
+        ::arrow::json::ArrayFromJSONString(struct_type, 
expected_json).ValueOrDie();
+    auto expected_batch =
+        ::arrow::RecordBatch::FromStructArray(expected_array).ValueOrDie();
+
+    ASSERT_TRUE(actual_batch->Equals(*expected_batch))
+        << "Actual batch:\n"
+        << actual_batch->ToString() << "\nExpected batch:\n"
+        << expected_batch->ToString();
+
+    result = record_batch_reader->Next();
+    ASSERT_TRUE(result.ok()) << result.status().message();
+    ASSERT_EQ(result.ValueOrDie(), nullptr) << "Reader returned an extra 
batch.";
+  }
+
+  void VerifyDataReaderBatchLengths(const DataFile& data_file,
+                                    const std::vector<int64_t>& 
expected_lengths,
+                                    ReaderProperties properties = {}) {
+    ReaderOptions options{
+        .path = data_file.file_path,
+        .length = static_cast<size_t>(data_file.file_size_in_bytes),
+        .io = file_io_,
+        .projection = table_schema_,
+        .properties = std::move(properties),
+    };
+    ICEBERG_UNWRAP_OR_FAIL(auto reader,
+                           ReaderFactoryRegistry::Open(data_file.file_format, 
options));
+
+    for (int64_t expected_length : expected_lengths) {
+      ICEBERG_UNWRAP_OR_FAIL(auto maybe_batch, reader->Next());
+      ASSERT_TRUE(maybe_batch.has_value()) << "Reader is exhausted too early.";
+
+      ArrowArray batch = std::move(maybe_batch.value());
+      internal::ArrowArrayGuard batch_guard(&batch);
+      ASSERT_EQ(batch.length, expected_length);
+    }
+
+    ICEBERG_UNWRAP_OR_FAIL(auto maybe_batch, reader->Next());
+    ASSERT_FALSE(maybe_batch.has_value()) << "Reader returned an extra batch.";
+    ASSERT_THAT(reader->Close(), IsOk());
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<PartitionSpec> partition_spec_;
+  std::shared_ptr<Schema> table_schema_;
+  std::shared_ptr<Schema> projected_schema_;
+};
+
+TEST_F(FileScanTaskReaderTest, OpenWithoutDeletesReadsProjectedSchema) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto data_file,
+      MakeDataFile(table_schema_,
+                   R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", 
"green"]])"));
+  FileScanTask task(data_file);
+
+  FileScanTaskReader::Options options{
+      .io = file_io_,
+      .table_schema = table_schema_,
+      .schemas = {table_schema_},
+      .projected_schema = projected_schema_,
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto reader, 
FileScanTaskReader::Make(std::move(options)));
+  auto stream_result = reader->Open(task);
+  ASSERT_THAT(stream_result, IsOk());
+  auto stream = std::move(stream_result.value());
+
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyStream(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest, OpenWithPositionDeletesFiltersRowsAndPrunesPos) 
{
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto data_file,
+      MakeDataFile(table_schema_,
+                   R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", 
"green"]])"));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto pos_delete, 
MakePositionDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"),
+                                              {1}, data_file->file_path));
+  FileScanTask task(data_file, {pos_delete});
+
+  FileScanTaskReader::Options options{
+      .io = file_io_,
+      .table_schema = table_schema_,
+      .schemas = {table_schema_},
+      .projected_schema = projected_schema_,
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto reader, 
FileScanTaskReader::Make(std::move(options)));
+  auto stream_result = reader->Open(task);
+  ASSERT_THAT(stream_result, IsOk());
+  auto stream = std::move(stream_result.value());
+
+  ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, 
"Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest, 
OpenWithEqualityDeletesAddsAndPrunesDeleteOnlyColumns) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto data_file,
+      MakeDataFile(table_schema_,
+                   R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", 
"green"]])"));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_delete,
+      MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), 
table_schema_,
+                             R"([[0, "unused", "red"]])", {3}));
+  FileScanTask task(data_file, {eq_delete});
+
+  FileScanTaskReader::Options options{
+      .io = file_io_,
+      .table_schema = table_schema_,
+      .schemas = {table_schema_},
+      .projected_schema = projected_schema_,
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto reader, 
FileScanTaskReader::Make(std::move(options)));
+  auto stream_result = reader->Open(task);
+  ASSERT_THAT(stream_result, IsOk());
+  auto stream = std::move(stream_result.value());
+
+  ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, 
"Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest, 
OpenWithEqualityDeletesKeepsInputBatchWhenAllRowsAlive) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto data_file,
+      MakeDataFile(table_schema_,
+                   R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz", 
"green"]])"));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_delete,
+      MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), 
table_schema_,
+                             R"([[99, "unused", "unused"]])", {1}));
+  FileScanTask task(data_file, {eq_delete});
+
+  FileScanTaskReader::Options options{
+      .io = file_io_,
+      .table_schema = table_schema_,
+      .schemas = {table_schema_},
+      .projected_schema = projected_schema_,
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto reader, 
FileScanTaskReader::Make(std::move(options)));
+  auto stream_result = reader->Open(task);
+  ASSERT_THAT(stream_result, IsOk());
+  auto stream = std::move(stream_result.value());
+
+  ASSERT_NO_FATAL_FAILURE(
+      VerifyStream(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest, OpenWithSchemasResolvesDroppedEqualityField) {
+  auto current_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(2, "name", string())},
+      /*schema_id=*/2);
+  auto old_schema = std::make_shared<Schema>(
+      std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                               SchemaField::MakeOptional(2, "name", string()),
+                               SchemaField::MakeOptional(4, "dropped_value", 
string())},
+      /*schema_id=*/1);
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto data_file,
+      MakeDataFile(old_schema,
+                   R"([[1, "Foo", "keep"], [2, "Bar", "gone"], [3, "Baz", 
"keep"]])"));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_delete,
+      MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), 
old_schema,
+                             R"([[0, "unused", "gone"]])", {4}));
+  FileScanTask task(data_file, {eq_delete});
+
+  FileScanTaskReader::Options options{
+      .io = file_io_,
+      .table_schema = current_schema,
+      .schemas = {current_schema, old_schema},
+      .projected_schema = current_schema,
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto reader, 
FileScanTaskReader::Make(std::move(options)));
+  auto stream_result = reader->Open(task);
+  ASSERT_THAT(stream_result, IsOk());
+  auto stream = std::move(stream_result.value());
+
+  ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3, 
"Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest, OpenWithMixedDeletesSkipsFullyDeletedBatches) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto data_file,
+      MakeDataFile(table_schema_,
+                   std::vector<std::string>{R"([[1, "Foo", "blue"], [2, "Bar", 
"red"]])",
+                                            R"([[3, "Baz", "green"]])"},
+                   /*record_count=*/3, /*max_row_group_length=*/2));
+  // Parquet can coalesce row groups when the reader batch size is larger.
+  ReaderProperties properties;
+  properties.Set(ReaderProperties::kBatchSize, int64_t{2});
+  ASSERT_NO_FATAL_FAILURE(VerifyDataReaderBatchLengths(*data_file, {2, 1}, 
properties));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto pos_delete, 
MakePositionDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"),
+                                              {0, 1}, data_file->file_path));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto eq_delete,
+      MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"), 
table_schema_,
+                             R"([[0, "unused", "yellow"]])", {3}));
+  FileScanTask task(data_file, {pos_delete, eq_delete});
+
+  FileScanTaskReader::Options options{
+      .io = file_io_,
+      .table_schema = table_schema_,
+      .schemas = {table_schema_},
+      .projected_schema = projected_schema_,
+      .properties = properties.configs(),
+  };
+  ICEBERG_UNWRAP_OR_FAIL(auto reader, 
FileScanTaskReader::Make(std::move(options)));
+  auto stream_result = reader->Open(task);
+  ASSERT_THAT(stream_result, IsOk());
+  auto stream = std::move(stream_result.value());
+
+  ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[3, "Baz"]])"));
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/file_scan_task_test.cc 
b/src/iceberg/test/file_scan_task_test.cc
index 55bc6a11..cb945ba1 100644
--- a/src/iceberg/test/file_scan_task_test.cc
+++ b/src/iceberg/test/file_scan_task_test.cc
@@ -28,6 +28,7 @@
 #include <parquet/metadata.h>
 
 #include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/data/file_scan_task_reader.h"
 #include "iceberg/file_format.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/parquet/parquet_register.h"
@@ -109,6 +110,20 @@ class FileScanTaskTest : public TempFileTestBase {
     return data_file;
   }
 
+  Result<ArrowArrayStream> OpenTask(const FileScanTask& task,
+                                    std::shared_ptr<Schema> projected_schema) {
+    auto current_schema = std::make_shared<Schema>(
+        std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+                                 SchemaField::MakeOptional(2, "name", 
string())});
+    FileScanTaskReader::Options options{
+        .io = file_io_,
+        .table_schema = current_schema,
+        .projected_schema = std::move(projected_schema),
+    };
+    ICEBERG_ASSIGN_OR_RAISE(auto reader, 
FileScanTaskReader::Make(std::move(options)));
+    return reader->Open(task);
+  }
+
   // Helper method to verify the content of the next batch from an 
ArrowArrayStream.
   void VerifyStreamNextBatch(struct ArrowArrayStream* stream,
                              std::string_view expected_json) {
@@ -154,9 +169,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) {
 
   FileScanTask task(data_file);
 
-  auto stream_result = task.ToArrow(file_io_, projected_schema);
-  ASSERT_THAT(stream_result, IsOk());
-  auto stream = std::move(stream_result.value());
+  ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema));
 
   ASSERT_NO_FATAL_FAILURE(
       VerifyStreamNextBatch(&stream, R"([[1, "Foo"], [2, "Bar"], [3, 
"Baz"]])"));
@@ -171,9 +184,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
 
   FileScanTask task(data_file);
 
-  auto stream_result = task.ToArrow(file_io_, projected_schema);
-  ASSERT_THAT(stream_result, IsOk());
-  auto stream = std::move(stream_result.value());
+  ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema));
 
   ASSERT_NO_FATAL_FAILURE(
       VerifyStreamNextBatch(&stream, R"([["Foo", null], ["Bar", null], ["Baz", 
null]])"));
@@ -188,9 +199,7 @@ TEST_F(FileScanTaskTest, ReadEmptyFile) {
 
   FileScanTask task(data_file);
 
-  auto stream_result = task.ToArrow(file_io_, projected_schema);
-  ASSERT_THAT(stream_result, IsOk());
-  auto stream = std::move(stream_result.value());
+  ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema));
 
   // The stream should be immediately exhausted
   ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream));


Reply via email to