This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 3d2537cd feat(data): add MOR file scan task reader (#657)
3d2537cd is described below
commit 3d2537cd3bebf3c9d5d72961e42e749cc3d42f5e
Author: Gang Wu <[email protected]>
AuthorDate: Fri May 22 15:20:33 2026 +0800
feat(data): add MOR file scan task reader (#657)
Add FileScanTaskReader as the delete-aware Arrow stream entrypoint for
file scan tasks, covering both no-delete reader passthrough and
merge-on-read filtering with position and equality deletes.
Introduce reusable Arrow C Data utilities for stream wrapping and batch
projection, including ProjectionContext caching and optional Arrow
compute registration via arrow::RegisterAll. Move Arrow IO registration
into arrow_register and remove FileScanTask::ToArrow.
Add coverage for projection behavior across nanoarrow and Arrow compute
paths, plus end-to-end FileScanTaskReader tests for projected reads,
position deletes, equality deletes, dropped equality fields, and fully
deleted batches.
---
src/iceberg/CMakeLists.txt | 5 +-
src/iceberg/arrow/arrow_c_data_util.cc | 131 ++++++
.../{arrow_io_register.cc => arrow_register.cc} | 52 +-
.../{arrow_io_register.h => arrow_register.h} | 8 +-
src/iceberg/arrow_c_data_util.cc | 397 ++++++++++++++++
src/iceberg/arrow_c_data_util_internal.h | 245 ++++++++++
src/iceberg/data/delete_filter.cc | 6 +-
src/iceberg/data/file_scan_task_reader.cc | 235 +++++++++
src/iceberg/data/file_scan_task_reader.h | 81 ++++
src/iceberg/data/meson.build | 1 +
src/iceberg/file_reader.h | 2 +-
src/iceberg/meson.build | 2 +
src/iceberg/table_scan.cc | 118 -----
src/iceberg/table_scan.h | 11 -
src/iceberg/test/CMakeLists.txt | 4 +-
src/iceberg/test/arrow_c_data_util_test.cc | 194 ++++++++
src/iceberg/test/file_scan_task_reader_test.cc | 524 +++++++++++++++++++++
src/iceberg/test/file_scan_task_test.cc | 27 +-
18 files changed, 1877 insertions(+), 166 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 68cacebe..2b02b999 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -18,6 +18,7 @@
set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
set(ICEBERG_SOURCES
+ arrow_c_data_util.cc
arrow_c_data_guard_internal.cc
catalog/memory/in_memory_catalog.cc
delete_file_index.cc
@@ -166,6 +167,7 @@ set(ICEBERG_DATA_SOURCES
data/delete_filter.cc
data/delete_loader.cc
data/equality_delete_writer.cc
+ data/file_scan_task_reader.cc
data/position_delete_writer.cc
data/writer.cc
deletes/position_delete_index.cc
@@ -222,9 +224,10 @@ add_subdirectory(util)
if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES
+ arrow/arrow_c_data_util.cc
arrow/arrow_io.cc
arrow/s3/arrow_s3_file_io.cc
- arrow/arrow_io_register.cc
+ arrow/arrow_register.cc
arrow/metadata_column_util.cc
avro/avro_data_util.cc
avro/avro_direct_decoder.cc
diff --git a/src/iceberg/arrow/arrow_c_data_util.cc
b/src/iceberg/arrow/arrow_c_data_util.cc
new file mode 100644
index 00000000..ea52d64c
--- /dev/null
+++ b/src/iceberg/arrow/arrow_c_data_util.cc
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <span>
+#include <utility>
+#include <vector>
+
+#include <arrow/array/array_primitive.h>
+#include <arrow/buffer.h>
+#include <arrow/c/bridge.h>
+#include <arrow/compute/api_vector.h>
+#include <arrow/record_batch.h>
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/result.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+struct ArrowProjectBatchState {
+ std::shared_ptr<::arrow::Schema> input_schema;
+ std::shared_ptr<::arrow::Schema> output_schema;
+};
+
+Result<std::shared_ptr<::arrow::Schema>> ImportArrowSchema(
+ const ArrowSchema& arrow_schema) {
+ ArrowSchema schema_copy;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowSchemaDeepCopy(&arrow_schema,
&schema_copy));
+ internal::ArrowSchemaGuard schema_copy_guard(&schema_copy);
+
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(auto schema,
::arrow::ImportSchema(&schema_copy));
+ return schema;
+}
+
+Result<std::shared_ptr<ArrowProjectBatchState>> GetArrowProjectBatchState(
+ ProjectionContext& projection) {
+ auto state =
+
std::static_pointer_cast<ArrowProjectBatchState>(projection.project_batch_state());
+ if (state != nullptr) {
+ return state;
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto input_schema,
+ ImportArrowSchema(projection.input_arrow_schema()));
+ ICEBERG_ASSIGN_OR_RAISE(auto output_schema,
+ ImportArrowSchema(projection.output_arrow_schema()));
+
+ state = std::make_shared<ArrowProjectBatchState>(
+ ArrowProjectBatchState{.input_schema = std::move(input_schema),
+ .output_schema = std::move(output_schema)});
+ projection.project_batch_state() = state;
+ return state;
+}
+
+Result<ArrowArray> ProjectBatchArrowCompute(ArrowArray* input_batch,
+ std::span<const int32_t>
row_indices,
+ ProjectionContext& projection) {
+ ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
+ ICEBERG_ASSIGN_OR_RAISE(auto state, GetArrowProjectBatchState(projection));
+
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(
+ auto input_record_batch,
+ ::arrow::ImportRecordBatch(input_batch, state->input_schema));
+
+ const int32_t empty_index = 0;
+ // Buffer::Wrap needs a valid pointer even when the zero-length buffer is
never read.
+ const int32_t* row_indices_data =
+ row_indices.empty() ? &empty_index : row_indices.data();
+ auto index_array = std::make_shared<::arrow::Int32Array>(
+ static_cast<int64_t>(row_indices.size()),
+ ::arrow::Buffer::Wrap(row_indices_data, row_indices.size()));
+
+ std::vector<std::shared_ptr<::arrow::Array>> output_columns;
+ output_columns.reserve(projection.selected_field_indices().size());
+ for (int32_t input_index : projection.selected_field_indices()) {
+ ICEBERG_PRECHECK(input_index >= 0 && input_index <
input_record_batch->num_columns(),
+ "Input field index {} out of range for batch with {}
columns",
+ input_index, input_record_batch->num_columns());
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(
+ auto taken_column,
+ ::arrow::compute::Take(*input_record_batch->column(input_index),
*index_array));
+ output_columns.push_back(std::move(taken_column));
+ }
+
+ auto output_record_batch = ::arrow::RecordBatch::Make(
+ state->output_schema, static_cast<int64_t>(row_indices.size()),
+ std::move(output_columns));
+
+ ArrowArray output_array;
+ ICEBERG_ARROW_RETURN_NOT_OK(
+ ::arrow::ExportRecordBatch(*output_record_batch, &output_array));
+ internal::ArrowArrayGuard output_array_guard(&output_array);
+
+ return std::exchange(output_array, ArrowArray{});
+}
+
+} // namespace
+
+void RegisterArrowProjectBatch() {
+ static std::once_flag flag;
+ std::call_once(flag, []() {
+ ProjectionContext::RegisterProjectBatchFunction(&ProjectBatchArrowCompute);
+ });
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/arrow/arrow_io_register.cc
b/src/iceberg/arrow/arrow_register.cc
similarity index 50%
rename from src/iceberg/arrow/arrow_io_register.cc
rename to src/iceberg/arrow/arrow_register.cc
index 43273c0a..d2983ffa 100644
--- a/src/iceberg/arrow/arrow_io_register.cc
+++ b/src/iceberg/arrow/arrow_register.cc
@@ -1,28 +1,35 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "iceberg/arrow/arrow_io_register.h"
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/arrow/arrow_register.h"
#include <mutex>
#include <string>
+#include <unordered_map>
#include "iceberg/arrow/arrow_io_util.h"
#include "iceberg/file_io_registry.h"
+namespace iceberg {
+void RegisterArrowProjectBatch();
+}
+
namespace iceberg::arrow {
namespace {
@@ -43,8 +50,6 @@ void RegisterS3FileIO() {
#endif
}
-} // namespace
-
void EnsureArrowFileIOsRegistered() {
static std::once_flag flag;
std::call_once(flag, []() {
@@ -58,4 +63,11 @@ void EnsureArrowFileIOsRegistered() {
return true;
}();
+} // namespace
+
+void RegisterAll() {
+ EnsureArrowFileIOsRegistered();
+ ::iceberg::RegisterArrowProjectBatch();
+}
+
} // namespace iceberg::arrow
diff --git a/src/iceberg/arrow/arrow_io_register.h
b/src/iceberg/arrow/arrow_register.h
similarity index 79%
rename from src/iceberg/arrow/arrow_io_register.h
rename to src/iceberg/arrow/arrow_register.h
index f28b7a56..7a67be51 100644
--- a/src/iceberg/arrow/arrow_io_register.h
+++ b/src/iceberg/arrow/arrow_register.h
@@ -19,16 +19,16 @@
#pragma once
-/// \file iceberg/arrow/arrow_io_register.h
-/// \brief Provide functions to register Arrow FileIO implementations.
+/// \file iceberg/arrow/arrow_register.h
+/// \brief Provide functions to register Arrow bundle integrations.
#include "iceberg/iceberg_bundle_export.h"
namespace iceberg::arrow {
-/// \brief Register built-in Arrow FileIO implementations into the
FileIORegistry.
+/// \brief Register Arrow FileIOs and Arrow-backed C Data utilities.
///
/// This operation is idempotent and safe to call multiple times.
-ICEBERG_BUNDLE_EXPORT void EnsureArrowFileIOsRegistered();
+ICEBERG_BUNDLE_EXPORT void RegisterAll();
} // namespace iceberg::arrow
diff --git a/src/iceberg/arrow_c_data_util.cc b/src/iceberg/arrow_c_data_util.cc
new file mode 100644
index 00000000..a1d765d2
--- /dev/null
+++ b/src/iceberg/arrow_c_data_util.cc
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <mutex>
+#include <span>
+#include <utility>
+#include <vector>
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
+ return MakeArrowArrayStream<Reader>(std::move(reader));
+}
+
+namespace {
+
+Result<size_t> FindFieldIndexById(std::span<const SchemaField> fields, int32_t
field_id) {
+ for (size_t index = 0; index < fields.size(); ++index) {
+ if (fields[index].field_id() == field_id) {
+ return index;
+ }
+ }
+ return InvalidArgument("Required schema does not contain projected field id
{}",
+ field_id);
+}
+
+std::mutex g_project_batch_function_mutex;
+ProjectionContext::ProjectBatchFunction g_project_batch_function = nullptr;
+
+ProjectionContext::ProjectBatchFunction GetProjectBatchFunction() {
+ std::lock_guard lock(g_project_batch_function_mutex);
+ return g_project_batch_function;
+}
+
+Result<std::vector<int32_t>> BuildSelectedFieldIndices(
+ std::span<const SchemaField> input_fields,
+ std::span<const SchemaField> output_fields) {
+ std::vector<int32_t> selected_field_indices;
+ selected_field_indices.reserve(output_fields.size());
+
+ for (const auto& output_field : output_fields) {
+ ICEBERG_ASSIGN_OR_RAISE(auto input_index,
+ FindFieldIndexById(input_fields,
output_field.field_id()));
+ const auto& input_field = input_fields[input_index];
+ if (*input_field.type() != *output_field.type()) {
+ return InvalidArgument(
+ "ProjectBatch only supports complete top-level fields, but field id "
+ "{} changes type from {} to {}",
+ output_field.field_id(), input_field.type()->ToString(),
+ output_field.type()->ToString());
+ }
+ ICEBERG_PRECHECK(
+ input_index <=
static_cast<size_t>(std::numeric_limits<int32_t>::max()),
+ "Input field index {} exceeds int32 range", input_index);
+ selected_field_indices.push_back(static_cast<int32_t>(input_index));
+ }
+
+ return selected_field_indices;
+}
+
+Status AppendValue(const ArrowSchema& input_schema, const ArrowArray&
input_array,
+ const ArrowArrayView& input_view, int64_t row_index,
+ ArrowArray* output_array);
+
+Status AppendListValues(const ArrowSchema& input_schema, const ArrowArray&
input_array,
+ const ArrowArrayView& input_view, int64_t row_index,
+ ArrowArray* output_array) {
+ const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index);
+ const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index +
1);
+ for (int64_t element_index = begin; element_index < end; ++element_index) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendValue(*input_schema.children[0], *input_array.children[0],
+ *input_view.children[0], element_index,
output_array->children[0]));
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+ return {};
+}
+
+Status AppendMapValues(const ArrowSchema& input_schema, const ArrowArray&
input_array,
+ const ArrowArrayView& input_view, int64_t row_index,
+ ArrowArray* output_array) {
+ const int64_t begin = ArrowArrayViewListChildOffset(&input_view, row_index);
+ const int64_t end = ArrowArrayViewListChildOffset(&input_view, row_index +
1);
+ for (int64_t entry_index = begin; entry_index < end; ++entry_index) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendValue(*input_schema.children[0], *input_array.children[0],
+ *input_view.children[0], entry_index,
output_array->children[0]));
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+ return {};
+}
+
+Status AppendDecimal(const ArrowSchema& input_schema, const ArrowArrayView&
input_view,
+ int64_t row_index, ArrowArray* output_array) {
+ ArrowError error;
+ ArrowSchemaView schema_view;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowSchemaViewInit(&schema_view, &input_schema, &error), error);
+
+ ArrowDecimal value;
+ ArrowDecimalInit(&value, schema_view.decimal_bitwidth,
schema_view.decimal_precision,
+ schema_view.decimal_scale);
+ ArrowArrayViewGetDecimalUnsafe(&input_view, row_index, &value);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDecimal(output_array,
&value));
+ return {};
+}
+
+Status AppendValue(const ArrowSchema& input_schema, const ArrowArray&
input_array,
+ const ArrowArrayView& input_view, int64_t row_index,
+ ArrowArray* output_array) {
+ if (ArrowArrayViewIsNull(&input_view, row_index)) {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array, 1));
+ return {};
+ }
+
+ switch (input_view.storage_type) {
+ case NANOARROW_TYPE_NA:
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(output_array,
1));
+ return {};
+ case NANOARROW_TYPE_BOOL:
+ case NANOARROW_TYPE_INT8:
+ case NANOARROW_TYPE_INT16:
+ case NANOARROW_TYPE_INT32:
+ case NANOARROW_TYPE_INT64:
+ case NANOARROW_TYPE_DATE32:
+ case NANOARROW_TYPE_DATE64:
+ case NANOARROW_TYPE_TIME32:
+ case NANOARROW_TYPE_TIME64:
+ case NANOARROW_TYPE_TIMESTAMP:
+ case NANOARROW_TYPE_DURATION:
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(
+ output_array, ArrowArrayViewGetIntUnsafe(&input_view, row_index)));
+ return {};
+ case NANOARROW_TYPE_UINT8:
+ case NANOARROW_TYPE_UINT16:
+ case NANOARROW_TYPE_UINT32:
+ case NANOARROW_TYPE_UINT64:
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt(
+ output_array, ArrowArrayViewGetUIntUnsafe(&input_view, row_index)));
+ return {};
+ case NANOARROW_TYPE_HALF_FLOAT:
+ case NANOARROW_TYPE_FLOAT:
+ case NANOARROW_TYPE_DOUBLE:
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble(
+ output_array, ArrowArrayViewGetDoubleUnsafe(&input_view,
row_index)));
+ return {};
+ case NANOARROW_TYPE_STRING:
+ case NANOARROW_TYPE_LARGE_STRING:
+ case NANOARROW_TYPE_STRING_VIEW: {
+ auto value = ArrowArrayViewGetStringUnsafe(&input_view, row_index);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(output_array,
value));
+ return {};
+ }
+ case NANOARROW_TYPE_BINARY:
+ case NANOARROW_TYPE_LARGE_BINARY:
+ case NANOARROW_TYPE_FIXED_SIZE_BINARY:
+ case NANOARROW_TYPE_BINARY_VIEW: {
+ auto value = ArrowArrayViewGetBytesUnsafe(&input_view, row_index);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(output_array,
value));
+ return {};
+ }
+ case NANOARROW_TYPE_DECIMAL128:
+ case NANOARROW_TYPE_DECIMAL256:
+ return AppendDecimal(input_schema, input_view, row_index, output_array);
+ case NANOARROW_TYPE_STRUCT: {
+ for (int64_t child_index = 0; child_index < input_schema.n_children;
+ ++child_index) {
+ ICEBERG_RETURN_UNEXPECTED(AppendValue(
+ *input_schema.children[child_index],
*input_array.children[child_index],
+ *input_view.children[child_index], row_index,
+ output_array->children[child_index]));
+ }
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(output_array));
+ return {};
+ }
+ case NANOARROW_TYPE_LIST:
+ case NANOARROW_TYPE_LARGE_LIST:
+ case NANOARROW_TYPE_FIXED_SIZE_LIST:
+ return AppendListValues(input_schema, input_array, input_view, row_index,
+ output_array);
+ case NANOARROW_TYPE_MAP:
+ return AppendMapValues(input_schema, input_array, input_view, row_index,
+ output_array);
+ default:
+ return NotImplemented("Unsupported Arrow type for merge-on-read
projection: {}",
+ static_cast<int>(input_view.storage_type));
+ }
+}
+
+} // namespace
+
+ProjectionContext::ProjectionContext(ProjectionContext&& other) noexcept
+ : input_schema_(std::exchange(other.input_schema_, nullptr)),
+ output_schema_(std::exchange(other.output_schema_, nullptr)),
+ selected_field_indices_(std::move(other.selected_field_indices_)),
+ input_arrow_schema_(other.input_arrow_schema_),
+ output_arrow_schema_(other.output_arrow_schema_),
+ project_batch_function_(std::exchange(other.project_batch_function_,
nullptr)),
+ project_batch_state_(std::move(other.project_batch_state_)) {
+ other.input_arrow_schema_.release = nullptr;
+ other.output_arrow_schema_.release = nullptr;
+}
+
+ProjectionContext& ProjectionContext::operator=(ProjectionContext&& other)
noexcept {
+ if (this == &other) {
+ return *this;
+ }
+
+ if (input_arrow_schema_.release != nullptr) {
+ input_arrow_schema_.release(&input_arrow_schema_);
+ }
+ if (output_arrow_schema_.release != nullptr) {
+ output_arrow_schema_.release(&output_arrow_schema_);
+ }
+
+ input_schema_ = std::exchange(other.input_schema_, nullptr);
+ output_schema_ = std::exchange(other.output_schema_, nullptr);
+ selected_field_indices_ = std::move(other.selected_field_indices_);
+ input_arrow_schema_ = other.input_arrow_schema_;
+ other.input_arrow_schema_.release = nullptr;
+ output_arrow_schema_ = other.output_arrow_schema_;
+ other.output_arrow_schema_.release = nullptr;
+ project_batch_function_ = std::exchange(other.project_batch_function_,
nullptr);
+ project_batch_state_ = std::move(other.project_batch_state_);
+ return *this;
+}
+
+ProjectionContext::~ProjectionContext() {
+ if (input_arrow_schema_.release != nullptr) {
+ input_arrow_schema_.release(&input_arrow_schema_);
+ }
+ if (output_arrow_schema_.release != nullptr) {
+ output_arrow_schema_.release(&output_arrow_schema_);
+ }
+}
+
+Result<ProjectionContext> ProjectionContext::Make(
+ const Schema& input_schema, const Schema& output_schema,
+ ProjectionContext::ProjectBatchFunction project_batch_function) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto selected_field_indices,
+ BuildSelectedFieldIndices(input_schema.fields(),
output_schema.fields()));
+
+ ProjectionContext context;
+ context.input_schema_ = &input_schema;
+ context.output_schema_ = &output_schema;
+ context.selected_field_indices_ = std::move(selected_field_indices);
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(input_schema,
&context.input_arrow_schema_));
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(output_schema,
&context.output_arrow_schema_));
+ context.project_batch_function_ = project_batch_function;
+
+ return context;
+}
+
+const Schema& ProjectionContext::input_schema() const { return *input_schema_;
}
+
+const Schema& ProjectionContext::output_schema() const { return
*output_schema_; }
+
+const ArrowSchema& ProjectionContext::input_arrow_schema() const {
+ return input_arrow_schema_;
+}
+
+const ArrowSchema& ProjectionContext::output_arrow_schema() const {
+ return output_arrow_schema_;
+}
+
+std::span<const int32_t> ProjectionContext::selected_field_indices() const {
+ return selected_field_indices_;
+}
+
+ProjectionContext::ProjectBatchFunction
ProjectionContext::project_batch_function()
+ const {
+ return project_batch_function_;
+}
+
+ProjectionContext::ProjectBatchState& ProjectionContext::project_batch_state()
{
+ return project_batch_state_;
+}
+
+void ProjectionContext::RegisterProjectBatchFunction(
+ ProjectionContext::ProjectBatchFunction project_batch_function) {
+ ICEBERG_DCHECK(project_batch_function != nullptr,
+ "ProjectBatch implementation must not be null");
+ if (project_batch_function == nullptr) {
+ return;
+ }
+ std::lock_guard lock(g_project_batch_function_mutex);
+ g_project_batch_function = project_batch_function;
+}
+
+bool ProjectionContext::HasProjectBatchFunction() {
+ return GetProjectBatchFunction() != nullptr;
+}
+
+auto ProjectionContext::ResolveProjectBatchFunction()
+ -> ProjectionContext::ProjectBatchFunction {
+ return GetProjectBatchFunction();
+}
+
+namespace {
+
+Result<ArrowArray> ProjectBatchNanoarrow(
+ const ArrowSchema& input_arrow_schema, const ArrowArray& input_batch,
+ std::span<const int32_t> row_indices, const ArrowSchema&
output_arrow_schema,
+ std::span<const int32_t> selected_field_indices) {
+ ArrowArrayView input_view;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayViewInitFromSchema(&input_view, &input_arrow_schema, &error),
error);
+ internal::ArrowArrayViewGuard input_view_guard(&input_view);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayViewSetArray(&input_view, &input_batch, &error), error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayViewValidate(&input_view, NANOARROW_VALIDATION_LEVEL_DEFAULT,
&error),
+ error);
+
+ ArrowArray output_array;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayInitFromSchema(&output_array, &output_arrow_schema, &error),
error);
+ internal::ArrowArrayGuard output_array_guard(&output_array);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&output_array));
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(
+ ArrowArrayReserve(&output_array,
static_cast<int64_t>(row_indices.size())));
+
+ for (int64_t row_index : row_indices) {
+ ICEBERG_PRECHECK(row_index >= 0 && row_index < input_batch.length,
+ "Row index {} out of range for batch length {}",
row_index,
+ input_batch.length);
+ for (size_t output_index = 0; output_index < selected_field_indices.size();
+ ++output_index) {
+ const int32_t input_index = selected_field_indices[output_index];
+
ICEBERG_RETURN_UNEXPECTED(AppendValue(*input_arrow_schema.children[input_index],
+ *input_batch.children[input_index],
+ *input_view.children[input_index],
row_index,
+
output_array.children[output_index]));
+ }
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&output_array));
+ }
+
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayFinishBuildingDefault(&output_array, &error), error);
+
+ return std::exchange(output_array, ArrowArray{});
+}
+
+} // namespace
+
+Result<ArrowArray> ProjectBatch(ArrowArray* input_batch,
+ std::span<const int32_t> row_indices,
+ ProjectionContext& projection) {
+ ICEBERG_PRECHECK(input_batch != nullptr, "input_batch must not be null");
+ internal::ArrowArrayGuard input_batch_guard(input_batch);
+
+ auto project_batch_function = projection.project_batch_function();
+ if (project_batch_function != nullptr) {
+ // ProjectBatch owns input_batch. Arrow-backed implementations import it
and clear
+ // release, so input_batch_guard becomes a no-op instead of
double-releasing.
+ return project_batch_function(input_batch, row_indices, projection);
+ }
+
+ return ProjectBatchNanoarrow(projection.input_arrow_schema(), *input_batch,
row_indices,
+ projection.output_arrow_schema(),
+ projection.selected_field_indices());
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/arrow_c_data_util_internal.h
b/src/iceberg/arrow_c_data_util_internal.h
new file mode 100644
index 00000000..e02db29a
--- /dev/null
+++ b/src/iceberg/arrow_c_data_util_internal.h
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cerrno>
+#include <concepts>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <optional>
+#include <span>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Cached state for ProjectBatch over one input/output schema pair.
+///
+/// Exported because this internal utility is shared across library
translation units.
+class ICEBERG_EXPORT ProjectionContext {
+ public:
+ using ProjectBatchState = std::shared_ptr<void>;
+
+ using ProjectBatchFunction = auto (*)(ArrowArray* input_batch,
+ std::span<const int32_t> row_indices,
+ ProjectionContext& projection)
+ -> Result<ArrowArray>;
+
+ /// \brief Register a custom implementation for ProjectBatch.
+ ///
+ /// Registration is process-wide. If multiple implementations are registered,
+ /// the last non-null implementation wins.
+ static void RegisterProjectBatchFunction(ProjectBatchFunction
project_batch_function);
+
+ /// \brief Returns true when a custom implementation has been registered.
+ static bool HasProjectBatchFunction();
+
+ /// \brief Resolve the registered ProjectBatch implementation.
+ static auto ResolveProjectBatchFunction() -> ProjectBatchFunction;
+
+ /// \brief Build reusable projection state for a validated schema pair.
+ ///
+ /// \param input_schema Schema that describes every input batch.
+ /// \param output_schema Final schema and column order requested by the
caller.
+ /// \param project_batch_function Optional implementation returned by
+ /// ProjectionContext::ResolveProjectBatchFunction, or nullptr to use the
nanoarrow
+ /// path.
+ /// \note It validates that output_schema selects or reorders complete
top-level fields
+ /// by field id. Nested pruning and type changes are rejected. The
input_schema and
+ /// output_schema passed to Make must outlive the context. ProjectBatch may
lazily
+ /// initialize backend cache; do not share one context across concurrent
calls.
+ static Result<ProjectionContext> Make(const Schema& input_schema,
+ const Schema& output_schema,
+ ProjectBatchFunction
project_batch_function);
+
+ ProjectionContext(ProjectionContext&&) noexcept;
+ ProjectionContext& operator=(ProjectionContext&&) noexcept;
+ ~ProjectionContext();
+
+ ProjectionContext(const ProjectionContext&) = delete;
+ ProjectionContext& operator=(const ProjectionContext&) = delete;
+
+ const Schema& input_schema() const;
+
+ const Schema& output_schema() const;
+
+ const ArrowSchema& input_arrow_schema() const;
+
+ const ArrowSchema& output_arrow_schema() const;
+
+ std::span<const int32_t> selected_field_indices() const;
+
+ ProjectBatchFunction project_batch_function() const;
+
+ ProjectBatchState& project_batch_state();
+
+ private:
+ ProjectionContext() = default;
+
+ // Raw schema pointers are borrowed from caller-owned schemas.
FileScanTaskReader
+ // keeps those schema objects alive in the same stream source that owns this
context.
+ const Schema* input_schema_ = nullptr;
+ const Schema* output_schema_ = nullptr;
+ std::vector<int32_t> selected_field_indices_;
+ ArrowSchema input_arrow_schema_{};
+ ArrowSchema output_arrow_schema_{};
+ ProjectBatchFunction project_batch_function_ = nullptr;
+ ProjectBatchState project_batch_state_;
+};
+
+/// \brief Concept for sources that can be wrapped as ArrowArrayStreams.
+template <typename Source>
+concept ArrowArrayStreamProvider = requires(Source& source) {
+ { source.Close() } -> std::same_as<Status>;
+ { source.Next() } -> std::same_as<Result<std::optional<ArrowArray>>>;
+ { source.Schema() } -> std::same_as<Result<ArrowSchema>>;
+};
+
+namespace detail {
+
+template <ArrowArrayStreamProvider Source>
+struct ArrowArrayStreamPrivateData {
+ std::unique_ptr<Source> source;
+ std::string last_error;
+
+ explicit ArrowArrayStreamPrivateData(std::unique_ptr<Source> src)
+ : source(std::move(src)) {}
+
+ ~ArrowArrayStreamPrivateData() {
+ if (source != nullptr) {
+ std::ignore = source->Close();
+ }
+ }
+};
+
+template <ArrowArrayStreamProvider Source>
+int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out) {
+ if (stream == nullptr || stream->private_data == nullptr) {
+ return EINVAL;
+ }
+
+ auto* private_data =
+ static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
+ auto schema_result = private_data->source->Schema();
+ if (!schema_result.has_value()) {
+ private_data->last_error = schema_result.error().message;
+ std::memset(out, 0, sizeof(ArrowSchema));
+ return EIO;
+ }
+
+ *out = std::move(schema_result.value());
+ return 0;
+}
+
+template <ArrowArrayStreamProvider Source>
+int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
+ if (stream == nullptr || stream->private_data == nullptr) {
+ return EINVAL;
+ }
+
+ auto* private_data =
+ static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
+ auto next_result = private_data->source->Next();
+ if (!next_result.has_value()) {
+ private_data->last_error = next_result.error().message;
+ std::memset(out, 0, sizeof(ArrowArray));
+ return EIO;
+ }
+
+ auto& optional_array = next_result.value();
+ if (optional_array.has_value()) {
+ *out = std::move(optional_array.value());
+ } else {
+ std::memset(out, 0, sizeof(ArrowArray));
+ }
+
+ return 0;
+}
+
+template <ArrowArrayStreamProvider Source>
+const char* GetLastError(struct ArrowArrayStream* stream) {
+ if (stream == nullptr || stream->private_data == nullptr) {
+ return nullptr;
+ }
+
+ auto* private_data =
+ static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
+ return private_data->last_error.empty() ? nullptr :
private_data->last_error.c_str();
+}
+
+template <ArrowArrayStreamProvider Source>
+void Release(struct ArrowArrayStream* stream) {
+ if (stream == nullptr || stream->private_data == nullptr) {
+ return;
+ }
+
+ delete
static_cast<ArrowArrayStreamPrivateData<Source>*>(stream->private_data);
+ stream->private_data = nullptr;
+ stream->release = nullptr;
+}
+
+} // namespace detail
+
+/// \brief Wrap an object with Close, Next, and Schema as an ArrowArrayStream.
+template <ArrowArrayStreamProvider Source>
+Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Source> source) {
+ if (source == nullptr) {
+ return InvalidArgument("Cannot make ArrowArrayStream from null source");
+ }
+
+ auto private_data =
+
std::make_unique<detail::ArrowArrayStreamPrivateData<Source>>(std::move(source));
+ ArrowArrayStream stream{.get_schema = detail::GetSchema<Source>,
+ .get_next = detail::GetNext<Source>,
+ .get_last_error = detail::GetLastError<Source>,
+ .release = detail::Release<Source>,
+ .private_data = private_data.release()};
+ return stream;
+}
+
+/// \brief Wrap a Reader as an ArrowArrayStream.
+ICEBERG_EXPORT Result<ArrowArrayStream> MakeArrowArrayStream(
+ std::unique_ptr<Reader> reader);
+
+/// \brief Project selected rows from a batch into complete top-level fields.
+///
+/// `input_batch` is consumed by this function. If the projection carries a
registered
+/// implementation, the call is delegated to it; otherwise the built-in
nanoarrow
+/// implementation is used. The projection must have been created for the
stable schema
+/// pair that describes the input batch and requested output. This function
does not
+/// revalidate schema compatibility on each batch.
+///
+/// \param input_batch Owned Arrow C Data batch to project.
+/// \param row_indices Zero-based row positions to copy from `input_batch`.
+/// \param projection Reusable schema/projection state created by
+/// ProjectionContext::Make.
+/// \return A newly owned ArrowArray matching `projection.output_schema()`.
+ICEBERG_EXPORT Result<ArrowArray> ProjectBatch(ArrowArray* input_batch,
+ std::span<const int32_t>
row_indices,
+ ProjectionContext& projection);
+
+} // namespace iceberg
diff --git a/src/iceberg/data/delete_filter.cc
b/src/iceberg/data/delete_filter.cc
index 876d644e..5f21a32d 100644
--- a/src/iceberg/data/delete_filter.cc
+++ b/src/iceberg/data/delete_filter.cc
@@ -20,6 +20,7 @@
#include "iceberg/data/delete_filter.h"
#include <algorithm>
+#include <limits>
#include <map>
#include <optional>
#include <set>
@@ -745,7 +746,10 @@ Result<AliveRowSelection>
DeleteFilter::ComputeAliveRows(const ArrowSchema& batc
return result;
}
- result.indices.reserve(batch.length);
+ ICEBERG_PRECHECK(
+ batch.length <=
static_cast<int64_t>(std::numeric_limits<int32_t>::max()),
+ "Batch length {} exceeds int32_t row index capacity", batch.length);
+ result.indices.reserve(static_cast<size_t>(batch.length));
ICEBERG_ASSIGN_OR_RAISE(auto row, ArrowArrayStructLike::Make(batch_schema,
batch));
for (int64_t i = 0; i < batch.length; ++i) {
diff --git a/src/iceberg/data/file_scan_task_reader.cc
b/src/iceberg/data/file_scan_task_reader.cc
new file mode 100644
index 00000000..7076486a
--- /dev/null
+++ b/src/iceberg/data/file_scan_task_reader.cc
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/file_scan_task_reader.h"
+
+#include <algorithm>
+#include <memory>
+#include <optional>
+#include <utility>
+#include <vector>
+
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/data/delete_filter.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+ReaderOptions MakeReaderOptions(const DataFile& data_file,
std::shared_ptr<FileIO> io,
+ std::shared_ptr<Schema> projection,
+ std::shared_ptr<Expression> filter,
+ std::shared_ptr<NameMapping> name_mapping,
+ ReaderProperties properties) {
+ return ReaderOptions{
+ .path = data_file.file_path,
+ .length = static_cast<size_t>(data_file.file_size_in_bytes),
+ .io = std::move(io),
+ .projection = std::move(projection),
+ .filter = std::move(filter),
+ .name_mapping = std::move(name_mapping),
+ .properties = std::move(properties),
+ };
+}
+
+class MergeOnReadStreamSource {
+ public:
+ MergeOnReadStreamSource(std::unique_ptr<Reader> reader,
+ std::unique_ptr<DeleteFilter> delete_filter,
+ std::shared_ptr<::iceberg::Schema> required_schema,
+ std::shared_ptr<::iceberg::Schema> projected_schema,
+ ProjectionContext projection_context)
+ : reader_(std::move(reader)),
+ delete_filter_(std::move(delete_filter)),
+ required_schema_(std::move(required_schema)),
+ projected_schema_(std::move(projected_schema)),
+ project_all_rows_(required_schema_->SameSchema(*projected_schema_)),
+ projection_context_(std::move(projection_context)) {}
+
+ ~MergeOnReadStreamSource() {
+ if (cached_schema_.has_value() && cached_schema_->release != nullptr) {
+ cached_schema_->release(&cached_schema_.value());
+ }
+ }
+
+ Status Close() {
+ if (reader_ == nullptr) {
+ return {};
+ }
+ return reader_->Close();
+ }
+
+ Result<std::optional<ArrowArray>> Next() {
+ if (!cached_schema_.has_value()) {
+ // File readers expose one stable Arrow schema for every batch in the
stream.
+ ICEBERG_ASSIGN_OR_RAISE(cached_schema_, reader_->Schema());
+ }
+ ArrowSchema& input_arrow_schema = cached_schema_.value();
+
+ while (true) {
+ ICEBERG_ASSIGN_OR_RAISE(auto next_batch, reader_->Next());
+ if (!next_batch.has_value()) {
+ return std::nullopt;
+ }
+
+ ArrowArray input_batch = std::move(next_batch.value());
+ internal::ArrowArrayGuard input_batch_guard(&input_batch);
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto alive, delete_filter_->ComputeAliveRows(input_arrow_schema,
input_batch));
+ if (alive.empty()) {
+ continue;
+ }
+
+ if (alive.alive_count() == input_batch.length && project_all_rows_) {
+ // Transfer ownership to the stream result; the local guard must not
release it.
+ ArrowArray output_batch = input_batch;
+ input_batch.release = nullptr;
+ return output_batch;
+ }
+
+ return ProjectBatch(&input_batch, alive.indices, projection_context_);
+ }
+ }
+
+ Result<ArrowSchema> Schema() {
+ ArrowSchema schema;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*projected_schema_, &schema));
+ return schema;
+ }
+
+ private:
+ std::unique_ptr<Reader> reader_;
+ std::unique_ptr<DeleteFilter> delete_filter_;
+ std::shared_ptr<::iceberg::Schema> required_schema_;
+ std::shared_ptr<::iceberg::Schema> projected_schema_;
+ bool project_all_rows_ = false;
+ ProjectionContext projection_context_;
+ std::optional<ArrowSchema> cached_schema_;
+};
+
+} // namespace
+
+class FileScanTaskReader::Impl {
+ public:
+ static Result<std::unique_ptr<Impl>> Make(Options options) {
+ ICEBERG_PRECHECK(options.io != nullptr, "FileIO must not be null");
+ ICEBERG_PRECHECK(options.table_schema != nullptr, "Table schema must not
be null");
+ ICEBERG_PRECHECK(options.projected_schema != nullptr,
+ "Projected schema must not be null");
+ for (const auto& schema : options.schemas) {
+ ICEBERG_PRECHECK(schema != nullptr, "Schema list must not contain null
schemas");
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto field_lookup,
+ DeleteFilter::MakeFieldLookup(options.table_schema, options.schemas));
+ auto delete_counter = std::make_shared<DeleteCounter>();
+
+ return std::unique_ptr<Impl>(
+ new Impl(std::move(options), std::move(field_lookup),
std::move(delete_counter)));
+ }
+
+ Result<ArrowArrayStream> Open(const FileScanTask& task) {
+ const auto& data_file = task.data_file();
+ ICEBERG_PRECHECK(data_file != nullptr, "Data file must not be null");
+ ICEBERG_PRECHECK(data_file->file_size_in_bytes >= 0,
+ "Data file size must not be negative: {}",
+ data_file->file_size_in_bytes);
+
+ if (task.delete_files().empty()) {
+ auto options =
+ MakeReaderOptions(*data_file, io_, projected_schema_,
task.residual_filter(),
+ name_mapping_, properties_);
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto reader, ReaderFactoryRegistry::Open(data_file->file_format,
options));
+ return MakeArrowArrayStream(std::move(reader));
+ }
+
+ const bool has_position_deletes =
+ std::any_of(task.delete_files().begin(), task.delete_files().end(),
+ [](const std::shared_ptr<DataFile>& f) {
+ return f->content == DataFile::Content::kPositionDeletes;
+ });
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto delete_filter,
+ DeleteFilter::Make(data_file->file_path, task.delete_files(),
projected_schema_,
+ io_, field_lookup_, has_position_deletes,
delete_counter_));
+
+ auto required_schema = delete_filter->RequiredSchema();
+ auto project_batch_function =
ProjectionContext::ResolveProjectBatchFunction();
+ // ProjectionContext borrows schemas that are kept in
MergeOnReadStreamSource.
+ ICEBERG_ASSIGN_OR_RAISE(auto projection_context,
+ ProjectionContext::Make(*required_schema,
*projected_schema_,
+ project_batch_function));
+
+ auto options = MakeReaderOptions(*data_file, io_, required_schema,
+ task.residual_filter(), name_mapping_,
properties_);
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+
ReaderFactoryRegistry::Open(data_file->file_format, options));
+
+ auto mor_reader = std::make_unique<MergeOnReadStreamSource>(
+ std::move(reader), std::move(delete_filter),
std::move(required_schema),
+ projected_schema_, std::move(projection_context));
+ return MakeArrowArrayStream(std::move(mor_reader));
+ }
+
+ private:
+ Impl(Options options, DeleteFilter::FieldLookup field_lookup,
+ std::shared_ptr<DeleteCounter> delete_counter)
+ : io_(std::move(options.io)),
+ schemas_(std::move(options.schemas)),
+ projected_schema_(std::move(options.projected_schema)),
+ name_mapping_(std::move(options.name_mapping)),
+ properties_(ReaderProperties::FromMap(options.properties)),
+ field_lookup_(std::move(field_lookup)),
+ delete_counter_(std::move(delete_counter)) {}
+
+ std::shared_ptr<FileIO> io_;
+ std::vector<std::shared_ptr<Schema>> schemas_;
+ std::shared_ptr<Schema> projected_schema_;
+ std::shared_ptr<NameMapping> name_mapping_;
+ ReaderProperties properties_;
+ DeleteFilter::FieldLookup field_lookup_;
+ std::shared_ptr<DeleteCounter> delete_counter_;
+};
+
+Result<std::unique_ptr<FileScanTaskReader>> FileScanTaskReader::Make(Options
options) {
+ ICEBERG_ASSIGN_OR_RAISE(auto impl, Impl::Make(std::move(options)));
+ return std::unique_ptr<FileScanTaskReader>(new
FileScanTaskReader(std::move(impl)));
+}
+
+FileScanTaskReader::FileScanTaskReader(std::unique_ptr<Impl> impl)
+ : impl_(std::move(impl)) {}
+
+FileScanTaskReader::~FileScanTaskReader() = default;
+
+Result<ArrowArrayStream> FileScanTaskReader::Open(const FileScanTask& task) {
+ return impl_->Open(task);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/data/file_scan_task_reader.h
b/src/iceberg/data/file_scan_task_reader.h
new file mode 100644
index 00000000..a71ef5f8
--- /dev/null
+++ b/src/iceberg/data/file_scan_task_reader.h
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/data/file_scan_task_reader.h
+/// Delete-aware FileScanTask reader for copy-on-write and merge-on-read paths.
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/iceberg_data_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Opens a FileScanTask as an ArrowArrayStream.
+///
+/// FileScanTaskReader chooses the copy-on-write path for tasks without
deletes and
+/// the merge-on-read path for tasks with v2 position or equality deletes. The
returned
+/// stream always exposes `projected_schema`.
+///
+/// TODO(gangwu): Add a mode that emits a `_deleted` column instead of
filtering rows.
+/// TODO(gangwu): Use evaluator to apply residual expression filters.
+class ICEBERG_DATA_EXPORT FileScanTaskReader {
+ public:
+ /// \brief Options shared by all tasks opened by this reader.
+ struct Options {
+ /// FileIO instance for reading data and delete files.
+ std::shared_ptr<FileIO> io;
+ /// The table schema. Used as the primary field lookup for delete file
resolution.
+ std::shared_ptr<Schema> table_schema;
+ /// Optional list of historical table schemas for field lookup.
+ std::vector<std::shared_ptr<Schema>> schemas;
+ /// The output schema for the returned ArrowArrayStream. Must be a
+ /// projection of table_schema.
+ std::shared_ptr<Schema> projected_schema;
+ /// Optional name mapping for files written without field IDs.
+ std::shared_ptr<NameMapping> name_mapping;
+ /// Format-specific or implementation-specific options for data readers.
+ std::unordered_map<std::string, std::string> properties;
+ };
+
+ /// \brief Create a reusable task reader from shared read context.
+ static Result<std::unique_ptr<FileScanTaskReader>> Make(Options options);
+
+ ~FileScanTaskReader();
+
+ /// \brief Open a task and return an Arrow C stream for its projected live
rows.
+ Result<ArrowArrayStream> Open(const FileScanTask& task);
+
+ FileScanTaskReader(const FileScanTaskReader&) = delete;
+ FileScanTaskReader& operator=(const FileScanTaskReader&) = delete;
+
+ private:
+ class Impl;
+ explicit FileScanTaskReader(std::unique_ptr<Impl> impl);
+ std::unique_ptr<Impl> impl_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/data/meson.build b/src/iceberg/data/meson.build
index f0877ec6..bbb26db2 100644
--- a/src/iceberg/data/meson.build
+++ b/src/iceberg/data/meson.build
@@ -21,6 +21,7 @@ install_headers(
'delete_filter.h',
'delete_loader.h',
'equality_delete_writer.h',
+ 'file_scan_task_reader.h',
'position_delete_writer.h',
'writer.h',
],
diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h
index c31d9b29..c76c1009 100644
--- a/src/iceberg/file_reader.h
+++ b/src/iceberg/file_reader.h
@@ -68,7 +68,7 @@ struct ICEBERG_EXPORT Split {
size_t length;
};
-class ReaderProperties : public ConfigBase<ReaderProperties> {
+class ICEBERG_EXPORT ReaderProperties : public ConfigBase<ReaderProperties> {
public:
template <typename T>
using Entry = const ConfigBase<ReaderProperties>::Entry<T>;
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 03dc2447..69fe733b 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -41,6 +41,7 @@ configure_file(
iceberg_include_dir = include_directories('..')
iceberg_sources = files(
'arrow_c_data_guard_internal.cc',
+ 'arrow_c_data_util.cc',
'catalog/memory/in_memory_catalog.cc',
'delete_file_index.cc',
'expression/aggregate.cc',
@@ -147,6 +148,7 @@ iceberg_data_sources = files(
'data/delete_filter.cc',
'data/delete_loader.cc',
'data/equality_delete_writer.cc',
+ 'data/file_scan_task_reader.cc',
'data/position_delete_writer.cc',
'data/writer.cc',
'deletes/position_delete_index.cc',
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 71075d90..6881d34f 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -20,14 +20,11 @@
#include "iceberg/table_scan.h"
#include <cstdint>
-#include <cstring>
-#include <iterator>
#include <utility>
#include "iceberg/expression/binder.h"
#include "iceberg/expression/expression.h"
#include "iceberg/expression/residual_evaluator.h"
-#include "iceberg/file_reader.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_group.h"
#include "iceberg/result.h"
@@ -61,103 +58,6 @@ const std::vector<std::string> kScanColumnsWithStats = [] {
return cols;
}();
-/// \brief Private data structure to hold the Reader and error state
-struct ReaderStreamPrivateData {
- std::unique_ptr<Reader> reader;
- std::string last_error;
-
- explicit ReaderStreamPrivateData(std::unique_ptr<Reader> reader_ptr)
- : reader(std::move(reader_ptr)) {}
-
- ~ReaderStreamPrivateData() {
- if (reader) {
- std::ignore = reader->Close();
- }
- }
-};
-
-/// \brief Callback to get the stream schema
-static int GetSchema(struct ArrowArrayStream* stream, struct ArrowSchema* out)
{
- if (!stream || !stream->private_data) {
- return EINVAL;
- }
- auto* private_data =
static_cast<ReaderStreamPrivateData*>(stream->private_data);
- // Get schema from reader
- auto schema_result = private_data->reader->Schema();
- if (!schema_result.has_value()) {
- private_data->last_error = schema_result.error().message;
- std::memset(out, 0, sizeof(ArrowSchema));
- return EIO;
- }
-
- *out = std::move(schema_result.value());
- return 0;
-}
-
-/// \brief Callback to get the next array from the stream
-static int GetNext(struct ArrowArrayStream* stream, struct ArrowArray* out) {
- if (!stream || !stream->private_data) {
- return EINVAL;
- }
-
- auto* private_data =
static_cast<ReaderStreamPrivateData*>(stream->private_data);
-
- auto next_result = private_data->reader->Next();
- if (!next_result.has_value()) {
- private_data->last_error = next_result.error().message;
- std::memset(out, 0, sizeof(ArrowArray));
- return EIO;
- }
-
- auto& optional_array = next_result.value();
- if (optional_array.has_value()) {
- *out = std::move(optional_array.value());
- } else {
- // End of stream - set release to nullptr to signal end
- std::memset(out, 0, sizeof(ArrowArray));
- out->release = nullptr;
- }
-
- return 0;
-}
-
-/// \brief Callback to get the last error message
-static const char* GetLastError(struct ArrowArrayStream* stream) {
- if (!stream || !stream->private_data) {
- return nullptr;
- }
-
- auto* private_data =
static_cast<ReaderStreamPrivateData*>(stream->private_data);
- return private_data->last_error.empty() ? nullptr :
private_data->last_error.c_str();
-}
-
-/// \brief Callback to release the stream resources
-static void Release(struct ArrowArrayStream* stream) {
- if (!stream || !stream->private_data) {
- return;
- }
-
- delete static_cast<ReaderStreamPrivateData*>(stream->private_data);
- stream->private_data = nullptr;
- stream->release = nullptr;
-}
-
-Result<ArrowArrayStream> MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
- if (!reader) {
- return InvalidArgument("Reader cannot be null");
- }
-
- auto private_data =
std::make_unique<ReaderStreamPrivateData>(std::move(reader));
-
- ArrowArrayStream stream{.get_schema = GetSchema,
- .get_next = GetNext,
- .get_last_error = GetLastError,
- .release = Release,
- .private_data = private_data.release()};
-
- return stream;
-}
-
} // namespace
namespace internal {
@@ -280,24 +180,6 @@ int32_t FileScanTask::files_count() const { return 1; }
int64_t FileScanTask::estimated_row_count() const { return
data_file_->record_count; }
-Result<ArrowArrayStream> FileScanTask::ToArrow(
- const std::shared_ptr<FileIO>& io, std::shared_ptr<Schema>
projected_schema) const {
- if (!delete_files_.empty()) {
- return NotSupported("Reading data files with delete files is not yet
supported.");
- }
-
- const ReaderOptions options{.path = data_file_->file_path,
- .length = data_file_->file_size_in_bytes,
- .io = io,
- .projection = std::move(projected_schema),
- .filter = residual_filter_};
-
- ICEBERG_ASSIGN_OR_RAISE(auto reader,
- ReaderFactoryRegistry::Open(data_file_->file_format,
options));
-
- return MakeArrowArrayStream(std::move(reader));
-}
-
// ChangelogScanTask implementation
int64_t ChangelogScanTask::size_bytes() const {
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index 307946e6..64fb3ffd 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -27,7 +27,6 @@
#include <unordered_set>
#include <vector>
-#include "iceberg/arrow_c_data.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
#include "iceberg/table_metadata.h"
@@ -87,16 +86,6 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
int32_t files_count() const override;
int64_t estimated_row_count() const override;
- /// TODO(gangwu): move it to iceberg/data/task_scanner.h
- ///
- /// \brief Returns a C-ABI compatible ArrowArrayStream to read the data for
this task.
- ///
- /// \param io The FileIO instance for accessing the file data.
- /// \param projected_schema The projected schema for reading the data.
- /// \return A Result containing an ArrowArrayStream, or an error on failure.
- Result<ArrowArrayStream> ToArrow(const std::shared_ptr<FileIO>& io,
- std::shared_ptr<Schema> projected_schema)
const;
-
private:
std::shared_ptr<DataFile> data_file_;
std::vector<std::shared_ptr<DataFile>> delete_files_;
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 37e66ed5..c632403c 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -222,9 +222,11 @@ if(ICEBERG_BUILD_BUNDLE)
add_iceberg_test(data_test
USE_BUNDLE
SOURCES
+ arrow_c_data_util_test.cc
data_writer_test.cc
delete_filter_test.cc
- delete_loader_test.cc)
+ delete_loader_test.cc
+ file_scan_task_reader_test.cc)
endif()
diff --git a/src/iceberg/test/arrow_c_data_util_test.cc
b/src/iceberg/test/arrow_c_data_util_test.cc
new file mode 100644
index 00000000..c93a928a
--- /dev/null
+++ b/src/iceberg/test/arrow_c_data_util_test.cc
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/// \file arrow_c_data_util_test.cc
+/// Verifies ProjectBatch behavior across registered implementations.
+
+#include <cstdint>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <arrow/type.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_register.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+
+namespace iceberg::internal {
+
+namespace {
+
+std::shared_ptr<::arrow::RecordBatch> MakeBatch(const Schema& schema,
+ std::string_view json) {
+ ArrowSchema c_schema;
+ EXPECT_THAT(ToArrowSchema(schema, &c_schema), IsOk());
+ // ImportSchema takes ownership of c_schema and calls release.
+ auto arrow_schema = ::arrow::ImportSchema(&c_schema).ValueOrDie();
+ auto struct_type = ::arrow::struct_(arrow_schema->fields());
+ return ::arrow::RecordBatch::FromStructArray(
+ ::arrow::json::ArrayFromJSONString(struct_type, std::string(json))
+ .ValueOrDie())
+ .ValueOrDie();
+}
+
+ProjectionContext::ProjectBatchFunction ArrowComputeFunction() {
+ arrow::RegisterAll();
+ auto function = ProjectionContext::ResolveProjectBatchFunction();
+ EXPECT_NE(function, nullptr);
+ return function;
+}
+
+std::shared_ptr<::arrow::RecordBatch> RunProjectBatch(
+ const ::arrow::RecordBatch& batch, const std::vector<int32_t>&
alive_indices,
+ const Schema& required_schema, const Schema& projected_schema,
+ ProjectionContext::ProjectBatchFunction project_batch_function) {
+ ArrowSchema c_schema;
+ ArrowArray c_array;
+ EXPECT_TRUE(::arrow::ExportRecordBatch(batch, &c_array, &c_schema).ok());
+ ArrowSchemaGuard schema_guard(&c_schema);
+ ArrowArrayGuard array_guard(&c_array);
+
+ auto projection =
+ ProjectionContext::Make(required_schema, projected_schema,
project_batch_function);
+ EXPECT_THAT(projection, IsOk());
+
+ auto result = ProjectBatch(&c_array, alive_indices, projection.value());
+ EXPECT_THAT(result, IsOk());
+
+ ArrowSchema out_c_schema;
+ EXPECT_THAT(ToArrowSchema(projected_schema, &out_c_schema), IsOk());
+ auto arrow_out_schema = ::arrow::ImportSchema(&out_c_schema).ValueOrDie();
+
+ ArrowArray out_array = std::exchange(result.value(), ArrowArray{});
+ return ::arrow::ImportRecordBatch(&out_array, arrow_out_schema).ValueOrDie();
+}
+
+void ExpectProjectBatch(const ::arrow::RecordBatch& batch,
+ const std::vector<int32_t>& alive_indices,
+ const Schema& required_schema, const Schema&
projected_schema,
+ std::string_view expected_json) {
+ auto expected = MakeBatch(projected_schema, expected_json);
+ auto nanoarrow =
+ RunProjectBatch(batch, alive_indices, required_schema, projected_schema,
nullptr);
+ auto arrow_compute = RunProjectBatch(batch, alive_indices, required_schema,
+ projected_schema,
ArrowComputeFunction());
+
+ EXPECT_TRUE(nanoarrow->Equals(*expected)) << "nanoarrow:\n"
+ << nanoarrow->ToString() <<
"expected:\n"
+ << expected->ToString();
+ EXPECT_TRUE(arrow_compute->Equals(*expected))
+ << "arrow_compute:\n"
+ << arrow_compute->ToString() << "expected:\n"
+ << expected->ToString();
+ EXPECT_TRUE(nanoarrow->Equals(*arrow_compute))
+ << "nanoarrow:\n"
+ << nanoarrow->ToString() << "arrow_compute:\n"
+ << arrow_compute->ToString();
+}
+
+std::shared_ptr<Schema> MakeFullSchema() {
+ return std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name", string()),
+ SchemaField::MakeOptional(3, "score",
float64())});
+}
+
+} // namespace
+
+TEST(ProjectBatchTest, ProjectSelectedRowsWithoutColumnProjection) {
+ auto schema = MakeFullSchema();
+ auto batch = MakeBatch(*schema,
R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0],[4,"d",4.0]])");
+ std::vector<int32_t> alive = {0, 2};
+
+ ExpectProjectBatch(*batch, alive, *schema, *schema,
R"([[1,"a",1.0],[3,"c",3.0]])");
+}
+
+TEST(ProjectBatchTest, ProjectColumnsWithoutRowFiltering) {
+ auto full_schema = MakeFullSchema();
+ auto projected = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name",
string())});
+ auto batch = MakeBatch(*full_schema,
R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0]])");
+ std::vector<int32_t> alive = {0, 1, 2};
+
+ ExpectProjectBatch(*batch, alive, *full_schema, *projected,
+ R"([[1,"a"],[2,"b"],[3,"c"]])");
+}
+
+TEST(ProjectBatchTest, ProjectSelectedRowsAndReorderColumns) {
+ auto full_schema = MakeFullSchema();
+ // Reorder: score(3) before name(2), drop id(1).
+ auto projected = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeOptional(3, "score",
float64()),
+ SchemaField::MakeOptional(2, "name",
string())});
+ auto batch = MakeBatch(*full_schema,
R"([[1,"a",1.0],[2,"b",2.0],[3,"c",3.0]])");
+ std::vector<int32_t> alive = {1, 2};
+
+ ExpectProjectBatch(*batch, alive, *full_schema, *projected,
R"([[2.0,"b"],[3.0,"c"]])");
+}
+
+TEST(ProjectBatchTest, NullValues) {
+ auto schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name",
string())});
+ auto batch = MakeBatch(*schema, R"([[1,null],[2,"b"],[3,null]])");
+ std::vector<int32_t> alive = {0, 2};
+
+ ExpectProjectBatch(*batch, alive, *schema, *schema,
R"([[1,null],[3,null]])");
+}
+
+TEST(ProjectBatchTest, EmptyRowSelection) {
+ auto schema = MakeFullSchema();
+ auto batch = MakeBatch(*schema, R"([[1,"a",1.0],[2,"b",2.0]])");
+ std::vector<int32_t> alive = {};
+
+ ExpectProjectBatch(*batch, alive, *schema, *schema, R"([])");
+}
+
+TEST(ProjectBatchTest, ProjectionRejectsNestedPruning) {
+ auto input_schema = Schema(std::vector<SchemaField>{
+ SchemaField::MakeOptional(1, "person",
+
std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(2, "name",
string()),
+ SchemaField::MakeOptional(3, "age",
int32()),
+ })),
+ });
+ auto output_schema = Schema(std::vector<SchemaField>{
+ SchemaField::MakeOptional(1, "person",
+
std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeOptional(2, "name",
string()),
+ })),
+ });
+
+ auto projection = ProjectionContext::Make(input_schema, output_schema,
nullptr);
+
+ EXPECT_THAT(projection, IsError(ErrorKind::kInvalidArgument));
+}
+
+} // namespace iceberg::internal
diff --git a/src/iceberg/test/file_scan_task_reader_test.cc
b/src/iceberg/test/file_scan_task_reader_test.cc
new file mode 100644
index 00000000..1630a108
--- /dev/null
+++ b/src/iceberg/test/file_scan_task_reader_test.cc
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/file_scan_task_reader.h"
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <arrow/record_batch.h>
+#include <arrow/table.h>
+#include <parquet/arrow/writer.h>
+#include <parquet/metadata.h>
+#include <parquet/properties.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/arrow/arrow_register.h"
+#include "iceberg/arrow_c_data_guard_internal.h"
+#include "iceberg/arrow_c_data_util_internal.h"
+#include "iceberg/data/equality_delete_writer.h"
+#include "iceberg/data/position_delete_writer.h"
+#include "iceberg/file_format.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/temp_file_test_base.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+struct ExportedBatch {
+ ArrowSchema schema{};
+ ArrowArray array{};
+
+ ~ExportedBatch() {
+ if (array.release != nullptr) {
+ array.release(&array);
+ }
+ if (schema.release != nullptr) {
+ schema.release(&schema);
+ }
+ }
+
+ ExportedBatch() = default;
+ ExportedBatch(const ExportedBatch&) = delete;
+ ExportedBatch& operator=(const ExportedBatch&) = delete;
+
+ ExportedBatch(ExportedBatch&& other) noexcept
+ : schema(other.schema), array(other.array) {
+ other.schema.release = nullptr;
+ other.array.release = nullptr;
+ }
+ ExportedBatch& operator=(ExportedBatch&& other) noexcept = delete;
+};
+
+} // namespace
+
+class FileScanTaskReaderTest : public TempFileTestBase {
+ protected:
+ static void SetUpTestSuite() {
+ arrow::RegisterAll();
+ parquet::RegisterAll();
+ }
+
+ void SetUp() override {
+ TempFileTestBase::SetUp();
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeLocalFileIO();
+ partition_spec_ = PartitionSpec::Unpartitioned();
+ table_schema_ = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name",
string()),
+ SchemaField::MakeOptional(3, "category",
string())},
+ /*schema_id=*/2);
+ projected_schema_ = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name",
string())},
+ table_schema_->schema_id());
+ }
+
+ Result<ExportedBatch> MakeBatch(const Schema& schema,
+ const std::string& json_data) const {
+ ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(schema));
+ auto struct_type = ::arrow::struct_(arrow_schema->fields());
+ auto array_result = ::arrow::json::ArrayFromJSONString(struct_type,
json_data);
+ if (!array_result.ok()) {
+ return UnknownError(array_result.status().ToString());
+ }
+
+ ExportedBatch batch;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &batch.schema));
+ auto export_status =
+ ::arrow::ExportArray(*array_result.MoveValueUnsafe(), &batch.array);
+ if (!export_status.ok()) {
+ return UnknownError(export_status.ToString());
+ }
+ return std::move(batch);
+ }
+
+ Result<std::shared_ptr<::arrow::Schema>> MakeArrowSchema(const Schema&
schema) const {
+ ArrowSchema c_schema;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &c_schema));
+ auto arrow_schema_result = ::arrow::ImportSchema(&c_schema);
+ if (!arrow_schema_result.ok()) {
+ return UnknownError(arrow_schema_result.status().ToString());
+ }
+ return arrow_schema_result.MoveValueUnsafe();
+ }
+
+ Result<std::shared_ptr<::arrow::RecordBatch>> MakeRecordBatch(
+ const std::shared_ptr<::arrow::Schema>& arrow_schema,
+ const std::string& json_data) const {
+ auto struct_type = ::arrow::struct_(arrow_schema->fields());
+ auto array_result = ::arrow::json::ArrayFromJSONString(struct_type,
json_data);
+ if (!array_result.ok()) {
+ return UnknownError(array_result.status().ToString());
+ }
+
+ auto batch_result =
+ ::arrow::RecordBatch::FromStructArray(array_result.MoveValueUnsafe());
+ if (!batch_result.ok()) {
+ return UnknownError(batch_result.status().ToString());
+ }
+ return batch_result.MoveValueUnsafe();
+ }
+
+ Result<std::string> CreateParquetDataFile(std::shared_ptr<Schema> schema,
+ const std::string& json_data,
+ int64_t row_group_size = 1024) {
+ auto path = CreateNewTempFilePathWithSuffix(".parquet");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(*schema));
+ ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeRecordBatch(arrow_schema,
json_data));
+ auto table_result =
+ ::arrow::Table::FromRecordBatches(arrow_schema, {std::move(batch)});
+ if (!table_result.ok()) {
+ return UnknownError(table_result.status().ToString());
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto outfile,
arrow::OpenArrowOutputStream(file_io_, path));
+ auto write_status =
::parquet::arrow::WriteTable(*table_result.MoveValueUnsafe(),
+
::arrow::default_memory_pool(),
+ outfile, row_group_size);
+ if (!write_status.ok()) {
+ return UnknownError(write_status.ToString());
+ }
+ if (auto close_status = outfile->Close(); !close_status.ok()) {
+ return UnknownError(close_status.ToString());
+ }
+ return path;
+ }
+
+ Result<std::string> CreateParquetDataFile(std::shared_ptr<Schema> schema,
+ const std::vector<std::string>&
json_batches,
+ int64_t max_row_group_length) {
+ ICEBERG_PRECHECK(!json_batches.empty(), "Parquet data file must have a
batch");
+
+ auto path = CreateNewTempFilePathWithSuffix(".parquet");
+ ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, MakeArrowSchema(*schema));
+ ICEBERG_ASSIGN_OR_RAISE(auto outfile,
arrow::OpenArrowOutputStream(file_io_, path));
+
+ auto properties_builder = ::parquet::WriterProperties::Builder();
+ auto writer_properties =
+ properties_builder.compression(::parquet::Compression::UNCOMPRESSED)
+ ->max_row_group_length(max_row_group_length)
+ ->build();
+ auto writer_result = ::parquet::arrow::FileWriter::Open(
+ *arrow_schema, ::arrow::default_memory_pool(), outfile,
writer_properties);
+ if (!writer_result.ok()) {
+ return UnknownError(writer_result.status().ToString());
+ }
+ auto writer = writer_result.MoveValueUnsafe();
+
+ for (const auto& json_batch : json_batches) {
+ ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeRecordBatch(arrow_schema,
json_batch));
+ if (auto write_status = writer->WriteRecordBatch(*batch);
!write_status.ok()) {
+ return UnknownError(write_status.ToString());
+ }
+ }
+ if (auto close_status = writer->Close(); !close_status.ok()) {
+ return UnknownError(close_status.ToString());
+ }
+ ICEBERG_PRECHECK(
+ writer->metadata()->num_row_groups() ==
static_cast<int>(json_batches.size()),
+ "Expected {} Parquet row groups, got {}", json_batches.size(),
+ writer->metadata()->num_row_groups());
+ if (auto close_status = outfile->Close(); !close_status.ok()) {
+ return UnknownError(close_status.ToString());
+ }
+ return path;
+ }
+
+ Result<std::shared_ptr<DataFile>> MakeDataFile(std::shared_ptr<Schema>
schema,
+ const std::string& json_data,
+ int64_t record_count = 3) {
+ ICEBERG_ASSIGN_OR_RAISE(auto path,
+ CreateParquetDataFile(std::move(schema),
json_data));
+ ICEBERG_ASSIGN_OR_RAISE(auto input_file, file_io_->NewInputFile(path));
+ ICEBERG_ASSIGN_OR_RAISE(auto size, input_file->Size());
+ return std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kData,
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .record_count = record_count,
+ .file_size_in_bytes = size,
+ });
+ }
+
+ Result<std::shared_ptr<DataFile>> MakeDataFile(
+ std::shared_ptr<Schema> schema, const std::vector<std::string>&
json_batches,
+ int64_t record_count, int64_t max_row_group_length) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto path,
+ CreateParquetDataFile(std::move(schema), json_batches,
max_row_group_length));
+ ICEBERG_ASSIGN_OR_RAISE(auto input_file, file_io_->NewInputFile(path));
+ ICEBERG_ASSIGN_OR_RAISE(auto size, input_file->Size());
+ return std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kData,
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .record_count = record_count,
+ .file_size_in_bytes = size,
+ });
+ }
+
+ Result<std::shared_ptr<DataFile>> MakePositionDeleteFile(
+ const std::string& path, const std::vector<int64_t>& positions,
+ const std::string& data_path) {
+ PositionDeleteWriterOptions options{
+ .path = path,
+ .schema = table_schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .flush_threshold = 10000,
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer, PositionDeleteWriter::Make(options));
+ for (int64_t pos : positions) {
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteDelete(data_path, pos));
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata());
+ return metadata.data_files[0];
+ }
+
+ Result<std::shared_ptr<DataFile>> MakeEqualityDeleteFile(
+ const std::string& path, std::shared_ptr<Schema> schema,
+ const std::string& json_data, std::vector<int32_t> equality_field_ids) {
+ EqualityDeleteWriterOptions options{
+ .path = path,
+ .schema = schema,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .equality_field_ids = std::move(equality_field_ids),
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer, EqualityDeleteWriter::Make(options));
+ ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeBatch(*schema, json_data));
+ ICEBERG_RETURN_UNEXPECTED(writer->Write(&batch.array));
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata());
+ return metadata.data_files[0];
+ }
+
+ void VerifyStream(struct ArrowArrayStream* stream, std::string_view
expected_json) {
+ auto record_batch_reader =
::arrow::ImportRecordBatchReader(stream).ValueOrDie();
+
+ auto result = record_batch_reader->Next();
+ ASSERT_TRUE(result.ok()) << result.status().message();
+ auto actual_batch = result.ValueOrDie();
+ ASSERT_NE(actual_batch, nullptr) << "Stream is exhausted but expected more
data.";
+
+ auto struct_type = ::arrow::struct_(actual_batch->schema()->fields());
+ auto expected_array =
+ ::arrow::json::ArrayFromJSONString(struct_type,
expected_json).ValueOrDie();
+ auto expected_batch =
+ ::arrow::RecordBatch::FromStructArray(expected_array).ValueOrDie();
+
+ ASSERT_TRUE(actual_batch->Equals(*expected_batch))
+ << "Actual batch:\n"
+ << actual_batch->ToString() << "\nExpected batch:\n"
+ << expected_batch->ToString();
+
+ result = record_batch_reader->Next();
+ ASSERT_TRUE(result.ok()) << result.status().message();
+ ASSERT_EQ(result.ValueOrDie(), nullptr) << "Reader returned an extra
batch.";
+ }
+
+ void VerifyDataReaderBatchLengths(const DataFile& data_file,
+ const std::vector<int64_t>&
expected_lengths,
+ ReaderProperties properties = {}) {
+ ReaderOptions options{
+ .path = data_file.file_path,
+ .length = static_cast<size_t>(data_file.file_size_in_bytes),
+ .io = file_io_,
+ .projection = table_schema_,
+ .properties = std::move(properties),
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto reader,
+ ReaderFactoryRegistry::Open(data_file.file_format,
options));
+
+ for (int64_t expected_length : expected_lengths) {
+ ICEBERG_UNWRAP_OR_FAIL(auto maybe_batch, reader->Next());
+ ASSERT_TRUE(maybe_batch.has_value()) << "Reader is exhausted too early.";
+
+ ArrowArray batch = std::move(maybe_batch.value());
+ internal::ArrowArrayGuard batch_guard(&batch);
+ ASSERT_EQ(batch.length, expected_length);
+ }
+
+ ICEBERG_UNWRAP_OR_FAIL(auto maybe_batch, reader->Next());
+ ASSERT_FALSE(maybe_batch.has_value()) << "Reader returned an extra batch.";
+ ASSERT_THAT(reader->Close(), IsOk());
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<PartitionSpec> partition_spec_;
+ std::shared_ptr<Schema> table_schema_;
+ std::shared_ptr<Schema> projected_schema_;
+};
+
+TEST_F(FileScanTaskReaderTest, OpenWithoutDeletesReadsProjectedSchema) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto data_file,
+ MakeDataFile(table_schema_,
+ R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz",
"green"]])"));
+ FileScanTask task(data_file);
+
+ FileScanTaskReader::Options options{
+ .io = file_io_,
+ .table_schema = table_schema_,
+ .schemas = {table_schema_},
+ .projected_schema = projected_schema_,
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto reader,
FileScanTaskReader::Make(std::move(options)));
+ auto stream_result = reader->Open(task);
+ ASSERT_THAT(stream_result, IsOk());
+ auto stream = std::move(stream_result.value());
+
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyStream(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest, OpenWithPositionDeletesFiltersRowsAndPrunesPos)
{
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto data_file,
+ MakeDataFile(table_schema_,
+ R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz",
"green"]])"));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto pos_delete,
MakePositionDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"),
+ {1}, data_file->file_path));
+ FileScanTask task(data_file, {pos_delete});
+
+ FileScanTaskReader::Options options{
+ .io = file_io_,
+ .table_schema = table_schema_,
+ .schemas = {table_schema_},
+ .projected_schema = projected_schema_,
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto reader,
FileScanTaskReader::Make(std::move(options)));
+ auto stream_result = reader->Open(task);
+ ASSERT_THAT(stream_result, IsOk());
+ auto stream = std::move(stream_result.value());
+
+ ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3,
"Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest,
OpenWithEqualityDeletesAddsAndPrunesDeleteOnlyColumns) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto data_file,
+ MakeDataFile(table_schema_,
+ R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz",
"green"]])"));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_delete,
+ MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"),
table_schema_,
+ R"([[0, "unused", "red"]])", {3}));
+ FileScanTask task(data_file, {eq_delete});
+
+ FileScanTaskReader::Options options{
+ .io = file_io_,
+ .table_schema = table_schema_,
+ .schemas = {table_schema_},
+ .projected_schema = projected_schema_,
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto reader,
FileScanTaskReader::Make(std::move(options)));
+ auto stream_result = reader->Open(task);
+ ASSERT_THAT(stream_result, IsOk());
+ auto stream = std::move(stream_result.value());
+
+ ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3,
"Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest,
OpenWithEqualityDeletesKeepsInputBatchWhenAllRowsAlive) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto data_file,
+ MakeDataFile(table_schema_,
+ R"([[1, "Foo", "blue"], [2, "Bar", "red"], [3, "Baz",
"green"]])"));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_delete,
+ MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"),
table_schema_,
+ R"([[99, "unused", "unused"]])", {1}));
+ FileScanTask task(data_file, {eq_delete});
+
+ FileScanTaskReader::Options options{
+ .io = file_io_,
+ .table_schema = table_schema_,
+ .schemas = {table_schema_},
+ .projected_schema = projected_schema_,
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto reader,
FileScanTaskReader::Make(std::move(options)));
+ auto stream_result = reader->Open(task);
+ ASSERT_THAT(stream_result, IsOk());
+ auto stream = std::move(stream_result.value());
+
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyStream(&stream, R"([[1, "Foo"], [2, "Bar"], [3, "Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest, OpenWithSchemasResolvesDroppedEqualityField) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name", string())},
+ /*schema_id=*/2);
+ auto old_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name", string()),
+ SchemaField::MakeOptional(4, "dropped_value",
string())},
+ /*schema_id=*/1);
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto data_file,
+ MakeDataFile(old_schema,
+ R"([[1, "Foo", "keep"], [2, "Bar", "gone"], [3, "Baz",
"keep"]])"));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_delete,
+ MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"),
old_schema,
+ R"([[0, "unused", "gone"]])", {4}));
+ FileScanTask task(data_file, {eq_delete});
+
+ FileScanTaskReader::Options options{
+ .io = file_io_,
+ .table_schema = current_schema,
+ .schemas = {current_schema, old_schema},
+ .projected_schema = current_schema,
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto reader,
FileScanTaskReader::Make(std::move(options)));
+ auto stream_result = reader->Open(task);
+ ASSERT_THAT(stream_result, IsOk());
+ auto stream = std::move(stream_result.value());
+
+ ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[1, "Foo"], [3,
"Baz"]])"));
+}
+
+TEST_F(FileScanTaskReaderTest, OpenWithMixedDeletesSkipsFullyDeletedBatches) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto data_file,
+ MakeDataFile(table_schema_,
+ std::vector<std::string>{R"([[1, "Foo", "blue"], [2, "Bar",
"red"]])",
+ R"([[3, "Baz", "green"]])"},
+ /*record_count=*/3, /*max_row_group_length=*/2));
+ // Parquet can coalesce row groups when the reader batch size is larger.
+ ReaderProperties properties;
+ properties.Set(ReaderProperties::kBatchSize, int64_t{2});
+ ASSERT_NO_FATAL_FAILURE(VerifyDataReaderBatchLengths(*data_file, {2, 1},
properties));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto pos_delete,
MakePositionDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"),
+ {0, 1}, data_file->file_path));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_delete,
+ MakeEqualityDeleteFile(CreateNewTempFilePathWithSuffix(".parquet"),
table_schema_,
+ R"([[0, "unused", "yellow"]])", {3}));
+ FileScanTask task(data_file, {pos_delete, eq_delete});
+
+ FileScanTaskReader::Options options{
+ .io = file_io_,
+ .table_schema = table_schema_,
+ .schemas = {table_schema_},
+ .projected_schema = projected_schema_,
+ .properties = properties.configs(),
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto reader,
FileScanTaskReader::Make(std::move(options)));
+ auto stream_result = reader->Open(task);
+ ASSERT_THAT(stream_result, IsOk());
+ auto stream = std::move(stream_result.value());
+
+ ASSERT_NO_FATAL_FAILURE(VerifyStream(&stream, R"([[3, "Baz"]])"));
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/file_scan_task_test.cc
b/src/iceberg/test/file_scan_task_test.cc
index 55bc6a11..cb945ba1 100644
--- a/src/iceberg/test/file_scan_task_test.cc
+++ b/src/iceberg/test/file_scan_task_test.cc
@@ -28,6 +28,7 @@
#include <parquet/metadata.h>
#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/data/file_scan_task_reader.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/parquet/parquet_register.h"
@@ -109,6 +110,20 @@ class FileScanTaskTest : public TempFileTestBase {
return data_file;
}
+ Result<ArrowArrayStream> OpenTask(const FileScanTask& task,
+ std::shared_ptr<Schema> projected_schema) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name",
string())});
+ FileScanTaskReader::Options options{
+ .io = file_io_,
+ .table_schema = current_schema,
+ .projected_schema = std::move(projected_schema),
+ };
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
FileScanTaskReader::Make(std::move(options)));
+ return reader->Open(task);
+ }
+
// Helper method to verify the content of the next batch from an
ArrowArrayStream.
void VerifyStreamNextBatch(struct ArrowArrayStream* stream,
std::string_view expected_json) {
@@ -154,9 +169,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) {
FileScanTask task(data_file);
- auto stream_result = task.ToArrow(file_io_, projected_schema);
- ASSERT_THAT(stream_result, IsOk());
- auto stream = std::move(stream_result.value());
+ ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema));
ASSERT_NO_FATAL_FAILURE(
VerifyStreamNextBatch(&stream, R"([[1, "Foo"], [2, "Bar"], [3,
"Baz"]])"));
@@ -171,9 +184,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
FileScanTask task(data_file);
- auto stream_result = task.ToArrow(file_io_, projected_schema);
- ASSERT_THAT(stream_result, IsOk());
- auto stream = std::move(stream_result.value());
+ ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema));
ASSERT_NO_FATAL_FAILURE(
VerifyStreamNextBatch(&stream, R"([["Foo", null], ["Bar", null], ["Baz",
null]])"));
@@ -188,9 +199,7 @@ TEST_F(FileScanTaskTest, ReadEmptyFile) {
FileScanTask task(data_file);
- auto stream_result = task.ToArrow(file_io_, projected_schema);
- ASSERT_THAT(stream_result, IsOk());
- auto stream = std::move(stream_result.value());
+ ICEBERG_UNWRAP_OR_FAIL(auto stream, OpenTask(task, projected_schema));
// The stream should be immediately exhausted
ASSERT_NO_FATAL_FAILURE(VerifyStreamExhausted(&stream));