This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new deec370f feat: Add support for Arrow LargeListArray in Parquet data
projection (#510)
deec370f is described below
commit deec370f915a6db604c151d881406ff444147ac1
Author: Dipanshu Pandey <[email protected]>
AuthorDate: Thu Jan 15 13:22:09 2026 +0530
feat: Add support for Arrow LargeListArray in Parquet data projection (#510)
## Summary
Add support for projecting Arrow `LargeListArray` (64-bit offsets) in
addition to the existing `ListArray` (32-bit offsets) support. This
enables handling of lists with more than 2^31-1 total child elements.
## Changes
- Add templated `ProjectListArrayImpl<>` function for code reuse between
list types
- Add `ProjectLargeListArray` wrapper function for 64-bit offset lists
- Update `ProjectNestedArray` to dispatch to appropriate handler based
on Arrow type
- Add test cases for `LargeListArray` projection
## Test Results
```
[ RUN ] ProjectRecordBatchTest.LargeListOfIntegers
[ OK ] ProjectRecordBatchTest.LargeListOfIntegers (0 ms)
[ RUN ] ProjectRecordBatchTest.LargeListOfStructs
[ OK ] ProjectRecordBatchTest.LargeListOfStructs (0 ms)
```
All 59 tests pass.
## Test Plan
- [x] Added `LargeListOfIntegers` test - verifies projection of
`LargeListArray<int32>`
- [x] Added `LargeListOfStructs` test - verifies projection of
`LargeListArray` with nested struct elements
- [x] Verified all existing tests still pass
Closes #502
---
src/iceberg/parquet/parquet_data_util.cc | 58 ++++++++---
src/iceberg/test/parquet_data_test.cc | 159 +++++++++++++++++++++++++++++++
2 files changed, 203 insertions(+), 14 deletions(-)
diff --git a/src/iceberg/parquet/parquet_data_util.cc
b/src/iceberg/parquet/parquet_data_util.cc
index 4a8b2cf5..14d20ff9 100644
--- a/src/iceberg/parquet/parquet_data_util.cc
+++ b/src/iceberg/parquet/parquet_data_util.cc
@@ -148,10 +148,13 @@ Result<std::shared_ptr<::arrow::Array>>
ProjectStructArray(
return output_array;
}
-/// FIXME: Support ::arrow::LargeListArray.
-Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
- const std::shared_ptr<::arrow::ListArray>& list_array,
- const std::shared_ptr<::arrow::ListType>& output_list_type, const
ListType& list_type,
+/// Templated implementation for projecting list arrays.
+/// Works with both ListArray/ListType (32-bit offsets) and
+/// LargeListArray/LargeListType (64-bit offsets).
+template <typename ArrowListArrayType, typename ArrowListType>
+Result<std::shared_ptr<::arrow::Array>> ProjectListArrayImpl(
+ const std::shared_ptr<ArrowListArrayType>& list_array,
+ const std::shared_ptr<ArrowListType>& output_list_type, const ListType&
list_type,
std::span<const FieldProjection> projections,
const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool*
pool) {
if (projections.size() != 1) {
@@ -176,12 +179,30 @@ Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
ProjectPrimitiveArray(list_array->values(), output_element_type,
pool));
}
- return std::make_shared<::arrow::ListArray>(
+ return std::make_shared<ArrowListArrayType>(
output_list_type, list_array->length(), list_array->value_offsets(),
std::move(projected_values), list_array->null_bitmap(),
list_array->null_count(),
list_array->offset());
}
+Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
+ const std::shared_ptr<::arrow::ListArray>& list_array,
+ const std::shared_ptr<::arrow::ListType>& output_list_type, const
ListType& list_type,
+ std::span<const FieldProjection> projections,
+ const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool*
pool) {
+ return ProjectListArrayImpl(list_array, output_list_type, list_type,
projections,
+ metadata_context, pool);
+}
+
+Result<std::shared_ptr<::arrow::Array>> ProjectLargeListArray(
+ const std::shared_ptr<::arrow::LargeListArray>& list_array,
+ const std::shared_ptr<::arrow::LargeListType>& output_list_type,
+ const ListType& list_type, std::span<const FieldProjection> projections,
+ const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool*
pool) {
+ return ProjectListArrayImpl(list_array, output_list_type, list_type,
projections,
+ metadata_context, pool);
+}
+
Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
const std::shared_ptr<::arrow::MapArray>& map_array,
const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType&
map_type,
@@ -249,17 +270,26 @@ Result<std::shared_ptr<::arrow::Array>>
ProjectNestedArray(
projections, metadata_context, pool);
}
case TypeId::kList: {
- if (output_arrow_type->id() != ::arrow::Type::LIST) {
- return InvalidSchema("Expected list type, got: {}",
- output_arrow_type->ToString());
+ const auto& list_type = internal::checked_cast<const
ListType&>(nested_type);
+
+ if (output_arrow_type->id() == ::arrow::Type::LIST) {
+ auto list_array =
internal::checked_pointer_cast<::arrow::ListArray>(array);
+ auto output_list_type =
+
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
+ return ProjectListArray(list_array, output_list_type, list_type,
projections,
+ metadata_context, pool);
}
- auto list_array =
internal::checked_pointer_cast<::arrow::ListArray>(array);
- auto output_list_type =
- internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
- const auto& list_type = internal::checked_cast<const
ListType&>(nested_type);
- return ProjectListArray(list_array, output_list_type, list_type,
projections,
- metadata_context, pool);
+ if (output_arrow_type->id() == ::arrow::Type::LARGE_LIST) {
+ auto list_array =
internal::checked_pointer_cast<::arrow::LargeListArray>(array);
+ auto output_list_type =
+
internal::checked_pointer_cast<::arrow::LargeListType>(output_arrow_type);
+ return ProjectLargeListArray(list_array, output_list_type, list_type,
projections,
+ metadata_context, pool);
+ }
+
+ return InvalidSchema("Expected list or large_list type, got: {}",
+ output_arrow_type->ToString());
}
case TypeId::kMap: {
if (output_arrow_type->id() != ::arrow::Type::MAP) {
diff --git a/src/iceberg/test/parquet_data_test.cc
b/src/iceberg/test/parquet_data_test.cc
index d7b5a6ed..9ed28114 100644
--- a/src/iceberg/test/parquet_data_test.cc
+++ b/src/iceberg/test/parquet_data_test.cc
@@ -18,6 +18,9 @@
*/
#include <arrow/array.h>
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_nested.h>
+#include <arrow/array/builder_primitive.h>
#include <arrow/c/bridge.h>
#include <arrow/json/from_string.h>
#include <arrow/record_batch.h>
@@ -503,4 +506,160 @@ TEST(ProjectRecordBatchTest, EmptyRecordBatch) {
VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json,
input_json));
}
+TEST(ProjectRecordBatchTest, LargeListOfIntegers) {
+ // Create a LargeListArray manually (JSON parsing creates regular ListArray)
+ auto value_builder = std::make_shared<::arrow::Int32Builder>();
+ ::arrow::LargeListBuilder list_builder(::arrow::default_memory_pool(),
value_builder,
+
::arrow::large_list(::arrow::int32()));
+
+ // Build: [[1, 2, 3], [4, 5]]
+ ASSERT_TRUE(list_builder.Append().ok());
+ ASSERT_TRUE(value_builder->Append(1).ok());
+ ASSERT_TRUE(value_builder->Append(2).ok());
+ ASSERT_TRUE(value_builder->Append(3).ok());
+
+ ASSERT_TRUE(list_builder.Append().ok());
+ ASSERT_TRUE(value_builder->Append(4).ok());
+ ASSERT_TRUE(value_builder->Append(5).ok());
+
+ auto large_list_array_result = list_builder.Finish();
+ ASSERT_TRUE(large_list_array_result.ok());
+ auto large_list_array = large_list_array_result.ValueOrDie();
+
+ // Create input record batch with the LargeListArray
+ auto input_arrow_schema =
+ ::arrow::schema({::arrow::field("numbers",
::arrow::large_list(::arrow::int32()))});
+ auto input_record_batch =
+ ::arrow::RecordBatch::Make(input_arrow_schema, 2, {large_list_array});
+
+ // Create Iceberg schema (uses ListType which maps to both LIST and
LARGE_LIST)
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(
+ 1, "numbers",
+ std::make_shared<ListType>(SchemaField::MakeRequired(2, "element",
int32()))),
+ });
+
+ // Create schema projection
+ auto schema_projection_result =
+ Project(iceberg_schema, iceberg_schema, /*prune_source=*/false);
+ ASSERT_THAT(schema_projection_result, IsOk());
+ auto schema_projection = std::move(schema_projection_result.value());
+
+ // Create output Arrow schema with LargeListType
+ auto output_arrow_schema =
+ ::arrow::schema({::arrow::field("numbers",
::arrow::large_list(::arrow::int32()))});
+
+ // Project the record batch
+ auto project_result = ProjectRecordBatch(
+ input_record_batch, output_arrow_schema, iceberg_schema,
schema_projection,
+ /*metadata_context=*/{}, ::arrow::default_memory_pool());
+ ASSERT_THAT(project_result, IsOk());
+ auto projected_record_batch = std::move(project_result.value());
+
+ // Verify the result is a LargeListArray
+ ASSERT_EQ(projected_record_batch->num_columns(), 1);
+ ASSERT_EQ(projected_record_batch->column(0)->type()->id(),
::arrow::Type::LARGE_LIST);
+
+ // Verify the values
+ auto projected_array = std::static_pointer_cast<::arrow::LargeListArray>(
+ projected_record_batch->column(0));
+ ASSERT_EQ(projected_array->length(), 2);
+
+ // First list: [1, 2, 3]
+ auto first_list = projected_array->value_slice(0);
+ ASSERT_EQ(first_list->length(), 3);
+ auto first_values =
std::static_pointer_cast<::arrow::Int32Array>(first_list);
+ EXPECT_EQ(first_values->Value(0), 1);
+ EXPECT_EQ(first_values->Value(1), 2);
+ EXPECT_EQ(first_values->Value(2), 3);
+
+ // Second list: [4, 5]
+ auto second_list = projected_array->value_slice(1);
+ ASSERT_EQ(second_list->length(), 2);
+ auto second_values =
std::static_pointer_cast<::arrow::Int32Array>(second_list);
+ EXPECT_EQ(second_values->Value(0), 4);
+ EXPECT_EQ(second_values->Value(1), 5);
+}
+
+TEST(ProjectRecordBatchTest, LargeListOfStructs) {
+ // Create a LargeListArray with struct elements
+ auto name_builder = std::make_shared<::arrow::StringBuilder>();
+ auto age_builder = std::make_shared<::arrow::Int32Builder>();
+ std::vector<std::shared_ptr<::arrow::ArrayBuilder>> field_builders =
{name_builder,
+
age_builder};
+ auto struct_type = ::arrow::struct_(
+ {::arrow::field("name", ::arrow::utf8()), ::arrow::field("age",
::arrow::int32())});
+ auto struct_builder = std::make_shared<::arrow::StructBuilder>(
+ struct_type, ::arrow::default_memory_pool(), field_builders);
+
+ ::arrow::LargeListBuilder list_builder(::arrow::default_memory_pool(),
struct_builder,
+ ::arrow::large_list(struct_type));
+
+ // Build: [[{name: "Alice", age: 30}], [{name: "Bob", age: 25}, {name:
"Carol", age:
+ // 35}]]
+ ASSERT_TRUE(list_builder.Append().ok());
+ ASSERT_TRUE(struct_builder->Append().ok());
+ ASSERT_TRUE(name_builder->Append("Alice").ok());
+ ASSERT_TRUE(age_builder->Append(30).ok());
+
+ ASSERT_TRUE(list_builder.Append().ok());
+ ASSERT_TRUE(struct_builder->Append().ok());
+ ASSERT_TRUE(name_builder->Append("Bob").ok());
+ ASSERT_TRUE(age_builder->Append(25).ok());
+ ASSERT_TRUE(struct_builder->Append().ok());
+ ASSERT_TRUE(name_builder->Append("Carol").ok());
+ ASSERT_TRUE(age_builder->Append(35).ok());
+
+ auto large_list_array_result = list_builder.Finish();
+ ASSERT_TRUE(large_list_array_result.ok());
+ auto large_list_array = large_list_array_result.ValueOrDie();
+
+ // Create input record batch
+ auto input_arrow_schema =
+ ::arrow::schema({::arrow::field("people",
::arrow::large_list(struct_type))});
+ auto input_record_batch =
+ ::arrow::RecordBatch::Make(input_arrow_schema, 2, {large_list_array});
+
+ // Create Iceberg schema
+ Schema iceberg_schema({
+ SchemaField::MakeRequired(1, "people",
+
std::make_shared<ListType>(SchemaField::MakeRequired(
+ 2, "element",
+
std::make_shared<StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name",
string()),
+ SchemaField::MakeRequired(4, "age",
int32()),
+ })))),
+ });
+
+ // Create schema projection
+ auto schema_projection_result =
+ Project(iceberg_schema, iceberg_schema, /*prune_source=*/false);
+ ASSERT_THAT(schema_projection_result, IsOk());
+ auto schema_projection = std::move(schema_projection_result.value());
+
+ // Create output Arrow schema with LargeListType
+ auto output_arrow_schema =
+ ::arrow::schema({::arrow::field("people",
::arrow::large_list(struct_type))});
+
+ // Project the record batch
+ auto project_result = ProjectRecordBatch(
+ input_record_batch, output_arrow_schema, iceberg_schema,
schema_projection,
+ /*metadata_context=*/{}, ::arrow::default_memory_pool());
+ ASSERT_THAT(project_result, IsOk());
+ auto projected_record_batch = std::move(project_result.value());
+
+ // Verify the result is a LargeListArray
+ ASSERT_EQ(projected_record_batch->num_columns(), 1);
+ ASSERT_EQ(projected_record_batch->column(0)->type()->id(),
::arrow::Type::LARGE_LIST);
+
+ auto projected_array = std::static_pointer_cast<::arrow::LargeListArray>(
+ projected_record_batch->column(0));
+ ASSERT_EQ(projected_array->length(), 2);
+
+ // Verify first list has 1 element
+ EXPECT_EQ(projected_array->value_length(0), 1);
+ // Verify second list has 2 elements
+ EXPECT_EQ(projected_array->value_length(1), 2);
+}
+
} // namespace iceberg::parquet