This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 1c67e4c1 feat: eliminate GenericDatum in Avro reader for performance
(#374)
1c67e4c1 is described below
commit 1c67e4c1fa910d6bbcd91b29bc5f721b912de46b
Author: Xinli Shang <[email protected]>
AuthorDate: Mon Dec 15 05:46:24 2025 -0800
feat: eliminate GenericDatum in Avro reader for performance (#374)
Replace GenericDatum intermediate layer with direct Avro decoder access
to improve manifest I/O performance.
Changes:
- Add avro_direct_decoder_internal.h with DecodeAvroToBuilder API
- Add avro_direct_decoder.cc implementing direct Avro→Arrow decoding
- Primitive types: bool, int, long, float, double, string, binary, fixed
- Temporal types: date, time, timestamp
- Logical types: uuid, decimal
- Nested types: struct, list, map
- Union type handling for nullable fields
- Modify avro_reader.cc to use DataFileReaderBase with direct decoder
- Replace DataFileReader<GenericDatum> with DataFileReaderBase
- Use decoder.decodeInt(), decodeLong(), etc. directly
- Remove GenericDatum allocation and extraction overhead
- Update CMakeLists.txt to include new decoder source
Performance improvement:
- Before: Avro binary → GenericDatum → Extract → Arrow (3 steps)
- After: Avro binary → decoder.decodeInt() → Arrow (2 steps)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/avro/CMakeLists.txt | 6 +
src/iceberg/avro/avro_direct_decoder.cc | 593 ++++++++++++++++++++++++
src/iceberg/avro/avro_direct_decoder_internal.h | 87 ++++
src/iceberg/avro/avro_reader.cc | 136 ++++--
src/iceberg/avro/avro_scan.cc | 204 ++++++++
src/iceberg/file_reader.h | 5 +
src/iceberg/test/avro_test.cc | 273 ++++++++++-
src/iceberg/test/temp_file_test_base.h | 9 +-
9 files changed, 1275 insertions(+), 39 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 6e9eb0ba..a0d93967 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -148,6 +148,7 @@ if(ICEBERG_BUILD_BUNDLE)
set(ICEBERG_BUNDLE_SOURCES
arrow/arrow_fs_file_io.cc
avro/avro_data_util.cc
+ avro/avro_direct_decoder.cc
avro/avro_reader.cc
avro/avro_writer.cc
avro/avro_register.cc
diff --git a/src/iceberg/avro/CMakeLists.txt b/src/iceberg/avro/CMakeLists.txt
index f8213038..a7663ba6 100644
--- a/src/iceberg/avro/CMakeLists.txt
+++ b/src/iceberg/avro/CMakeLists.txt
@@ -16,3 +16,9 @@
# under the License.
iceberg_install_all_headers(iceberg/avro)
+
+# avro_scan benchmark executable
+add_executable(avro_scan avro_scan.cc)
+target_link_libraries(avro_scan PRIVATE iceberg_bundle_static)
+set_target_properties(avro_scan PROPERTIES RUNTIME_OUTPUT_DIRECTORY
+
"${CMAKE_BINARY_DIR}/src/iceberg/avro")
diff --git a/src/iceberg/avro/avro_direct_decoder.cc
b/src/iceberg/avro/avro_direct_decoder.cc
new file mode 100644
index 00000000..60f79d21
--- /dev/null
+++ b/src/iceberg/avro/avro_direct_decoder.cc
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <arrow/array/builder_binary.h>
+#include <arrow/array/builder_decimal.h>
+#include <arrow/array/builder_nested.h>
+#include <arrow/array/builder_primitive.h>
+#include <arrow/type.h>
+#include <arrow/util/decimal.h>
+#include <avro/Decoder.hh>
+#include <avro/Node.hh>
+#include <avro/NodeImpl.hh>
+#include <avro/Types.hh>
+
+#include "iceberg/arrow/arrow_status_internal.h"
+#include "iceberg/avro/avro_direct_decoder_internal.h"
+#include "iceberg/avro/avro_schema_util_internal.h"
+#include "iceberg/schema.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg::avro {
+
+using ::iceberg::arrow::ToErrorKind;
+
+namespace {
+
+/// \brief Forward declaration for mutual recursion.
+Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
+ const FieldProjection& projection,
+ const SchemaField& projected_field,
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx);
+
+/// \brief Skip an Avro value based on its schema without decoding
+Status SkipAvroValue(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder) {
+ switch (avro_node->type()) {
+ case ::avro::AVRO_NULL:
+ decoder.decodeNull();
+ return {};
+
+ case ::avro::AVRO_BOOL:
+ decoder.decodeBool();
+ return {};
+
+ case ::avro::AVRO_INT:
+ decoder.decodeInt();
+ return {};
+
+ case ::avro::AVRO_LONG:
+ decoder.decodeLong();
+ return {};
+
+ case ::avro::AVRO_FLOAT:
+ decoder.decodeFloat();
+ return {};
+
+ case ::avro::AVRO_DOUBLE:
+ decoder.decodeDouble();
+ return {};
+
+ case ::avro::AVRO_STRING:
+ decoder.skipString();
+ return {};
+
+ case ::avro::AVRO_BYTES:
+ decoder.skipBytes();
+ return {};
+
+ case ::avro::AVRO_FIXED:
+ decoder.skipFixed(avro_node->fixedSize());
+ return {};
+
+ case ::avro::AVRO_RECORD: {
+ // Skip all fields in order
+ for (size_t i = 0; i < avro_node->leaves(); ++i) {
+ ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node->leafAt(i),
decoder));
+ }
+ return {};
+ }
+
+ case ::avro::AVRO_ENUM:
+ decoder.decodeEnum();
+ return {};
+
+ case ::avro::AVRO_ARRAY: {
+ const auto& element_node = avro_node->leafAt(0);
+ // skipArray() returns count like arrayStart(), must handle all blocks
+ int64_t block_count = decoder.skipArray();
+ while (block_count > 0) {
+ for (int64_t i = 0; i < block_count; ++i) {
+ ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(element_node, decoder));
+ }
+ block_count = decoder.arrayNext();
+ }
+ return {};
+ }
+
+ case ::avro::AVRO_MAP: {
+ const auto& value_node = avro_node->leafAt(1);
+ // skipMap() returns count like mapStart(), must handle all blocks
+ int64_t block_count = decoder.skipMap();
+ while (block_count > 0) {
+ for (int64_t i = 0; i < block_count; ++i) {
+ decoder.skipString(); // Skip key (always string in Avro maps)
+ ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(value_node, decoder));
+ }
+ block_count = decoder.mapNext();
+ }
+ return {};
+ }
+
+ case ::avro::AVRO_UNION: {
+ const size_t branch_index = decoder.decodeUnionIndex();
+ // Validate branch index
+ const size_t num_branches = avro_node->leaves();
+ if (branch_index >= num_branches) {
+ return InvalidArgument("Union branch index {} out of range [0, {})",
branch_index,
+ num_branches);
+ }
+ return SkipAvroValue(avro_node->leafAt(branch_index), decoder);
+ }
+
+ default:
+ return InvalidArgument("Unsupported Avro type for skipping: {}",
+ ToString(avro_node));
+ }
+}
+
+/// \brief Decode Avro record directly to Arrow struct builder.
+Status DecodeStructToBuilder(const ::avro::NodePtr& avro_node,
::avro::Decoder& decoder,
+ const std::span<const FieldProjection>&
projections,
+ const StructType& struct_type,
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
+ if (avro_node->type() != ::avro::AVRO_RECORD) {
+ return InvalidArgument("Expected Avro record, got type: {}",
ToString(avro_node));
+ }
+
+ auto* struct_builder =
internal::checked_cast<::arrow::StructBuilder*>(array_builder);
+ ICEBERG_ARROW_RETURN_NOT_OK(struct_builder->Append());
+
+ // Build a map from Avro field index to projection index (cached per struct
schema)
+ // -1 means the field should be skipped
+ const FieldProjection* cache_key = projections.data();
+ auto cache_it = ctx->avro_to_projection_cache.find(cache_key);
+ std::vector<int>* avro_to_projection;
+
+ if (cache_it != ctx->avro_to_projection_cache.end()) {
+ // Use cached mapping
+ avro_to_projection = &cache_it->second;
+ } else {
+ // Build and cache the mapping
+ auto [inserted_it, inserted] = ctx->avro_to_projection_cache.emplace(
+ cache_key, std::vector<int>(avro_node->leaves(), -1));
+ avro_to_projection = &inserted_it->second;
+
+ for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) {
+ const auto& field_projection = projections[proj_idx];
+ if (field_projection.kind == FieldProjection::Kind::kProjected) {
+ size_t avro_field_index = std::get<size_t>(field_projection.from);
+ (*avro_to_projection)[avro_field_index] = static_cast<int>(proj_idx);
+ }
+ }
+ }
+
+ // Read all Avro fields in order (must maintain decoder position)
+ for (size_t avro_idx = 0; avro_idx < avro_node->leaves(); ++avro_idx) {
+ int proj_idx = (*avro_to_projection)[avro_idx];
+
+ if (proj_idx < 0) {
+ // Skip this field - not in projection
+ const auto& avro_field_node = avro_node->leafAt(avro_idx);
+ ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_field_node, decoder));
+ } else {
+ // Decode this field
+ const auto& field_projection = projections[proj_idx];
+ const auto& expected_field = struct_type.fields()[proj_idx];
+ const auto& avro_field_node = avro_node->leafAt(avro_idx);
+ auto* field_builder = struct_builder->field_builder(proj_idx);
+
+ ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(avro_field_node, decoder,
+ field_projection,
expected_field,
+ field_builder, ctx));
+ }
+ }
+
+ // Handle null fields (fields in projection but not in Avro)
+ for (size_t proj_idx = 0; proj_idx < projections.size(); ++proj_idx) {
+ const auto& field_projection = projections[proj_idx];
+ if (field_projection.kind == FieldProjection::Kind::kNull) {
+ auto* field_builder =
struct_builder->field_builder(static_cast<int>(proj_idx));
+ ICEBERG_ARROW_RETURN_NOT_OK(field_builder->AppendNull());
+ } else if (field_projection.kind != FieldProjection::Kind::kProjected) {
+ return InvalidArgument("Unsupported field projection kind: {}",
+ static_cast<int>(field_projection.kind));
+ }
+ }
+ return {};
+}
+
+/// \brief Decode Avro array directly to Arrow list builder.
+Status DecodeListToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
+ const FieldProjection& element_projection,
+ const ListType& list_type,
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
+ if (avro_node->type() != ::avro::AVRO_ARRAY) {
+ return InvalidArgument("Expected Avro array, got type: {}",
ToString(avro_node));
+ }
+
+ auto* list_builder =
internal::checked_cast<::arrow::ListBuilder*>(array_builder);
+ ICEBERG_ARROW_RETURN_NOT_OK(list_builder->Append());
+
+ auto* value_builder = list_builder->value_builder();
+ const auto& element_node = avro_node->leafAt(0);
+ const auto& element_field = list_type.fields().back();
+
+ // Read array block count
+ int64_t block_count = decoder.arrayStart();
+ while (block_count != 0) {
+ for (int64_t i = 0; i < block_count; ++i) {
+ ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(
+ element_node, decoder, element_projection, element_field,
value_builder, ctx));
+ }
+ block_count = decoder.arrayNext();
+ }
+
+ return {};
+}
+
+/// \brief Decode Avro map directly to Arrow map builder.
+Status DecodeMapToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
+ const FieldProjection& key_projection,
+ const FieldProjection& value_projection,
+ const MapType& map_type, ::arrow::ArrayBuilder*
array_builder,
+ DecodeContext* ctx) {
+ auto* map_builder =
internal::checked_cast<::arrow::MapBuilder*>(array_builder);
+
+ if (avro_node->type() == ::avro::AVRO_MAP) {
+ // Handle regular Avro map: map<string, value>
+ const auto& key_node = avro_node->leafAt(0);
+ const auto& value_node = avro_node->leafAt(1);
+ const auto& key_field = map_type.key();
+ const auto& value_field = map_type.value();
+
+ ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append());
+ auto* key_builder = map_builder->key_builder();
+ auto* item_builder = map_builder->item_builder();
+
+ // Read map block count
+ int64_t block_count = decoder.mapStart();
+ while (block_count != 0) {
+ for (int64_t i = 0; i < block_count; ++i) {
+ ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder,
key_projection,
+ key_field, key_builder,
ctx));
+ ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(
+ value_node, decoder, value_projection, value_field, item_builder,
ctx));
+ }
+ block_count = decoder.mapNext();
+ }
+
+ return {};
+ } else if (avro_node->type() == ::avro::AVRO_ARRAY &&
HasMapLogicalType(avro_node)) {
+ // Handle array-based map: list<struct<key, value>>
+ const auto& key_field = map_type.key();
+ const auto& value_field = map_type.value();
+
+ ICEBERG_ARROW_RETURN_NOT_OK(map_builder->Append());
+ auto* key_builder = map_builder->key_builder();
+ auto* item_builder = map_builder->item_builder();
+
+ const auto& record_node = avro_node->leafAt(0);
+ if (record_node->type() != ::avro::AVRO_RECORD || record_node->leaves() !=
2) {
+ return InvalidArgument(
+ "Array-based map must contain records with exactly 2 fields, got:
{}",
+ ToString(record_node));
+ }
+ const auto& key_node = record_node->leafAt(0);
+ const auto& value_node = record_node->leafAt(1);
+
+ // Read array block count
+ int64_t block_count = decoder.arrayStart();
+ while (block_count != 0) {
+ for (int64_t i = 0; i < block_count; ++i) {
+ ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(key_node, decoder,
key_projection,
+ key_field, key_builder,
ctx));
+ ICEBERG_RETURN_UNEXPECTED(DecodeFieldToBuilder(
+ value_node, decoder, value_projection, value_field, item_builder,
ctx));
+ }
+ block_count = decoder.arrayNext();
+ }
+
+ return {};
+ } else {
+ return InvalidArgument("Expected Avro map or array with map logical type,
got: {}",
+ ToString(avro_node));
+ }
+}
+
+/// \brief Decode nested Avro data directly to Arrow array builder.
+Status DecodeNestedValueToBuilder(const ::avro::NodePtr& avro_node,
+ ::avro::Decoder& decoder,
+ const std::span<const FieldProjection>&
projections,
+ const NestedType& projected_type,
+ ::arrow::ArrayBuilder* array_builder,
+ DecodeContext* ctx) {
+ switch (projected_type.type_id()) {
+ case TypeId::kStruct: {
+ const auto& struct_type = internal::checked_cast<const
StructType&>(projected_type);
+ return DecodeStructToBuilder(avro_node, decoder, projections,
struct_type,
+ array_builder, ctx);
+ }
+
+ case TypeId::kList: {
+ if (projections.size() != 1) {
+ return InvalidArgument("Expected 1 projection for list, got: {}",
+ projections.size());
+ }
+ const auto& list_type = internal::checked_cast<const
ListType&>(projected_type);
+ return DecodeListToBuilder(avro_node, decoder, projections[0], list_type,
+ array_builder, ctx);
+ }
+
+ case TypeId::kMap: {
+ if (projections.size() != 2) {
+ return InvalidArgument("Expected 2 projections for map, got: {}",
+ projections.size());
+ }
+ const auto& map_type = internal::checked_cast<const
MapType&>(projected_type);
+ return DecodeMapToBuilder(avro_node, decoder, projections[0],
projections[1],
+ map_type, array_builder, ctx);
+ }
+
+ default:
+ return InvalidArgument("Unsupported nested type: {}",
projected_type.ToString());
+ }
+}
+
+Status DecodePrimitiveValueToBuilder(const ::avro::NodePtr& avro_node,
+ ::avro::Decoder& decoder,
+ const SchemaField& projected_field,
+ ::arrow::ArrayBuilder* array_builder,
+ DecodeContext* ctx) {
+ const auto& projected_type = *projected_field.type();
+ if (!projected_type.is_primitive()) {
+ return InvalidArgument("Expected primitive type, got: {}",
projected_type.ToString());
+ }
+
+ switch (projected_type.type_id()) {
+ case TypeId::kBoolean: {
+ if (avro_node->type() != ::avro::AVRO_BOOL) {
+ return InvalidArgument("Expected Avro boolean for boolean field, got:
{}",
+ ToString(avro_node));
+ }
+ auto* builder =
internal::checked_cast<::arrow::BooleanBuilder*>(array_builder);
+ bool value = decoder.decodeBool();
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+ return {};
+ }
+
+ case TypeId::kInt: {
+ if (avro_node->type() != ::avro::AVRO_INT) {
+ return InvalidArgument("Expected Avro int for int field, got: {}",
+ ToString(avro_node));
+ }
+ auto* builder =
internal::checked_cast<::arrow::Int32Builder*>(array_builder);
+ int32_t value = decoder.decodeInt();
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+ return {};
+ }
+
+ case TypeId::kLong: {
+ auto* builder =
internal::checked_cast<::arrow::Int64Builder*>(array_builder);
+ if (avro_node->type() == ::avro::AVRO_LONG) {
+ int64_t value = decoder.decodeLong();
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+ } else if (avro_node->type() == ::avro::AVRO_INT) {
+ int32_t value = decoder.decodeInt();
+
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<int64_t>(value)));
+ } else {
+ return InvalidArgument("Expected Avro int/long for long field, got:
{}",
+ ToString(avro_node));
+ }
+ return {};
+ }
+
+ case TypeId::kFloat: {
+ if (avro_node->type() != ::avro::AVRO_FLOAT) {
+ return InvalidArgument("Expected Avro float for float field, got: {}",
+ ToString(avro_node));
+ }
+ auto* builder =
internal::checked_cast<::arrow::FloatBuilder*>(array_builder);
+ float value = decoder.decodeFloat();
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+ return {};
+ }
+
+ case TypeId::kDouble: {
+ auto* builder =
internal::checked_cast<::arrow::DoubleBuilder*>(array_builder);
+ if (avro_node->type() == ::avro::AVRO_DOUBLE) {
+ double value = decoder.decodeDouble();
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+ } else if (avro_node->type() == ::avro::AVRO_FLOAT) {
+ float value = decoder.decodeFloat();
+
ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(static_cast<double>(value)));
+ } else {
+ return InvalidArgument("Expected Avro float/double for double field,
got: {}",
+ ToString(avro_node));
+ }
+ return {};
+ }
+
+ case TypeId::kString: {
+ if (avro_node->type() != ::avro::AVRO_STRING) {
+ return InvalidArgument("Expected Avro string for string field, got:
{}",
+ ToString(avro_node));
+ }
+ auto* builder =
internal::checked_cast<::arrow::StringBuilder*>(array_builder);
+ decoder.decodeString(ctx->string_scratch);
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->string_scratch));
+ return {};
+ }
+
+ case TypeId::kBinary: {
+ if (avro_node->type() != ::avro::AVRO_BYTES) {
+ return InvalidArgument("Expected Avro bytes for binary field, got: {}",
+ ToString(avro_node));
+ }
+ auto* builder =
internal::checked_cast<::arrow::BinaryBuilder*>(array_builder);
+ decoder.decodeBytes(ctx->bytes_scratch);
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(
+ ctx->bytes_scratch.data(),
static_cast<int32_t>(ctx->bytes_scratch.size())));
+ return {};
+ }
+
+ case TypeId::kFixed: {
+ if (avro_node->type() != ::avro::AVRO_FIXED) {
+ return InvalidArgument("Expected Avro fixed for fixed field, got: {}",
+ ToString(avro_node));
+ }
+ const auto& fixed_type = internal::checked_cast<const
FixedType&>(projected_type);
+ auto* builder =
+
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);
+
+ ctx->bytes_scratch.resize(fixed_type.length());
+ decoder.decodeFixed(fixed_type.length(), ctx->bytes_scratch);
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
+ return {};
+ }
+
+ case TypeId::kUuid: {
+ if (avro_node->type() != ::avro::AVRO_FIXED ||
+ avro_node->logicalType().type() != ::avro::LogicalType::UUID) {
+ return InvalidArgument("Expected Avro fixed for uuid field, got: {}",
+ ToString(avro_node));
+ }
+
+ auto* builder =
+
internal::checked_cast<::arrow::FixedSizeBinaryBuilder*>(array_builder);
+
+ ctx->bytes_scratch.resize(16);
+ decoder.decodeFixed(16, ctx->bytes_scratch);
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(ctx->bytes_scratch.data()));
+ return {};
+ }
+
+ case TypeId::kDecimal: {
+ if (avro_node->type() != ::avro::AVRO_FIXED ||
+ avro_node->logicalType().type() != ::avro::LogicalType::DECIMAL) {
+ return InvalidArgument(
+ "Expected Avro fixed with DECIMAL logical type for decimal field,
got: {}",
+ ToString(avro_node));
+ }
+
+ size_t byte_width = avro_node->fixedSize();
+ auto* builder =
internal::checked_cast<::arrow::Decimal128Builder*>(array_builder);
+
+ ctx->bytes_scratch.resize(byte_width);
+ decoder.decodeFixed(byte_width, ctx->bytes_scratch);
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(
+ auto decimal,
::arrow::Decimal128::FromBigEndian(ctx->bytes_scratch.data(),
+
ctx->bytes_scratch.size()));
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(decimal));
+ return {};
+ }
+
+ case TypeId::kDate: {
+ if (avro_node->type() != ::avro::AVRO_INT ||
+ avro_node->logicalType().type() != ::avro::LogicalType::DATE) {
+ return InvalidArgument(
+ "Expected Avro int with DATE logical type for date field, got: {}",
+ ToString(avro_node));
+ }
+ auto* builder =
internal::checked_cast<::arrow::Date32Builder*>(array_builder);
+ int32_t value = decoder.decodeInt();
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+ return {};
+ }
+
+ case TypeId::kTime: {
+ if (avro_node->type() != ::avro::AVRO_LONG ||
+ avro_node->logicalType().type() != ::avro::LogicalType::TIME_MICROS)
{
+ return InvalidArgument(
+ "Expected Avro long with TIME_MICROS for time field, got: {}",
+ ToString(avro_node));
+ }
+ auto* builder =
internal::checked_cast<::arrow::Time64Builder*>(array_builder);
+ int64_t value = decoder.decodeLong();
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+ return {};
+ }
+
+ case TypeId::kTimestamp:
+ case TypeId::kTimestampTz: {
+ if (avro_node->type() != ::avro::AVRO_LONG ||
+ avro_node->logicalType().type() !=
::avro::LogicalType::TIMESTAMP_MICROS) {
+ return InvalidArgument(
+ "Expected Avro long with TIMESTAMP_MICROS for timestamp field,
got: {}",
+ ToString(avro_node));
+ }
+ auto* builder =
internal::checked_cast<::arrow::TimestampBuilder*>(array_builder);
+ int64_t value = decoder.decodeLong();
+ ICEBERG_ARROW_RETURN_NOT_OK(builder->Append(value));
+ return {};
+ }
+
+ default:
+ return InvalidArgument("Unsupported primitive type {} to decode from
avro node {}",
+ projected_field.type()->ToString(),
ToString(avro_node));
+ }
+}
+
+/// \brief Dispatch to appropriate handlers based on the projection kind.
+Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
+ const FieldProjection& projection,
+ const SchemaField& projected_field,
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
+ if (avro_node->type() == ::avro::AVRO_UNION) {
+ const size_t branch_index = decoder.decodeUnionIndex();
+
+ // Validate branch index
+ const size_t num_branches = avro_node->leaves();
+ if (branch_index >= num_branches) {
+ return InvalidArgument("Union branch index {} out of range [0, {})",
branch_index,
+ num_branches);
+ }
+
+ const auto& branch_node = avro_node->leafAt(branch_index);
+ if (branch_node->type() == ::avro::AVRO_NULL) {
+ ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull());
+ return {};
+ } else {
+ return DecodeFieldToBuilder(branch_node, decoder, projection,
projected_field,
+ array_builder, ctx);
+ }
+ }
+
+ const auto& projected_type = *projected_field.type();
+ if (projected_type.is_primitive()) {
+ return DecodePrimitiveValueToBuilder(avro_node, decoder, projected_field,
+ array_builder, ctx);
+ } else {
+ const auto& nested_type = internal::checked_cast<const
NestedType&>(projected_type);
+ return DecodeNestedValueToBuilder(avro_node, decoder, projection.children,
+ nested_type, array_builder, ctx);
+ }
+}
+
+} // namespace
+
+Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
+ const SchemaProjection& projection,
+ const Schema& projected_schema,
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx) {
+ return DecodeNestedValueToBuilder(avro_node, decoder, projection.fields,
+ projected_schema, array_builder, ctx);
+}
+
+} // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_direct_decoder_internal.h
b/src/iceberg/avro/avro_direct_decoder_internal.h
new file mode 100644
index 00000000..df4587fd
--- /dev/null
+++ b/src/iceberg/avro/avro_direct_decoder_internal.h
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <memory>
+#include <span>
+
+#include <arrow/array/builder_base.h>
+#include <avro/Decoder.hh>
+#include <avro/Node.hh>
+
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/schema_util.h"
+
+namespace iceberg::avro {
+
+/// \brief Context for reusing scratch buffers during Avro decoding
+///
+/// Avoids frequent small allocations by reusing temporary buffers across
+/// multiple decode operations. This is particularly important for string,
+/// binary, and fixed-size data types.
+struct DecodeContext {
+ // Scratch buffer for string decoding (reused across rows)
+ std::string string_scratch;
+ // Scratch buffer for binary/fixed/uuid/decimal data (reused across rows)
+ std::vector<uint8_t> bytes_scratch;
+ // Cache for avro field index to projection index mapping
+ // Key: pointer to projections array (identifies struct schema)
+ // Value: vector mapping avro field index -> projection index (-1 if not
projected)
+ std::unordered_map<const FieldProjection*, std::vector<int>>
avro_to_projection_cache;
+};
+
+/// \brief Directly decode Avro data to Arrow array builders without
GenericDatum
+///
+/// Eliminates the GenericDatum intermediate layer by directly calling Avro
decoder
+/// methods and immediately appending to Arrow builders. Matches Java Iceberg's
+/// ValueReader approach for better performance.
+///
+/// Features:
+/// - All primitive, temporal, and logical types
+/// - Nested types (struct, list, map)
+/// - Union types with bounds checking
+/// - Field skipping for schema projection
+///
+/// Schema Evolution:
+/// Schema resolution is handled via SchemaProjection (from Project()
function).
+/// Supports field reordering and missing fields (set to NULL). Default values
+/// are NOT currently supported.
+///
+/// Error Handling:
+/// - Type mismatches → InvalidArgument
+/// - Union branch out of range → InvalidArgument
+/// - Decimal precision insufficient → InvalidArgument
+/// - Missing logical type → InvalidArgument
+///
+/// \param avro_node The Avro schema node for the data being decoded
+/// \param decoder The Avro decoder positioned at the data to read
+/// \param projection The field projections (from Project() function)
+/// \param projected_schema The target Iceberg schema after projection
+/// \param array_builder The Arrow array builder to append decoded data to
+/// \param ctx Decode context for reusing scratch buffers
+/// \return Status::OK if successful, or an error status
+Status DecodeAvroToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder&
decoder,
+ const SchemaProjection& projection,
+ const Schema& projected_schema,
+ ::arrow::ArrayBuilder* array_builder,
DecodeContext* ctx);
+
+} // namespace iceberg::avro
diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index 932dd0f1..fa7337f0 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -34,6 +34,7 @@
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
+#include "iceberg/avro/avro_direct_decoder_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/avro/avro_stream_internal.h"
@@ -62,18 +63,19 @@ Result<std::unique_ptr<AvroInputStream>>
CreateInputStream(const ReaderOptions&
// A stateful context to keep track of the reading progress.
struct ReadContext {
- // The datum to reuse for reading the data.
- std::unique_ptr<::avro::GenericDatum> datum_;
// The arrow schema to build the record batch.
std::shared_ptr<::arrow::Schema> arrow_schema_;
// The builder to build the record batch.
std::shared_ptr<::arrow::ArrayBuilder> builder_;
+ // GenericDatum for GenericDatum-based decoding (only used if direct decoder
is
+ // disabled)
+ std::unique_ptr<::avro::GenericDatum> datum_;
+ // Decode context for reusing scratch buffers (only used if direct decoder is
+ // enabled)
+ DecodeContext decode_context_;
};
-// TODO(gang.wu): there are a lot to do to make this reader work.
-// 1. prune the reader schema based on the projection
-// 2. read key-value metadata from the avro file
-// 3. collect basic reader metrics
+// TODO(gang.wu): collect basic reader metrics
class AvroReader::Impl {
public:
Status Open(const ReaderOptions& options) {
@@ -83,6 +85,7 @@ class AvroReader::Impl {
}
batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
+ use_direct_decoder_ =
options.properties->Get(ReaderProperties::kAvroSkipDatum);
read_schema_ = options.projection;
// Open the input stream and adapt to the avro interface.
@@ -91,10 +94,21 @@ class AvroReader::Impl {
ICEBERG_ASSIGN_OR_RAISE(auto input_stream,
CreateInputStream(options, kDefaultBufferSize));
- // Create a base reader without setting reader schema to enable projection.
- auto base_reader =
- std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
- ::avro::ValidSchema file_schema = base_reader->dataSchema();
+ ::avro::ValidSchema file_schema;
+
+ if (use_direct_decoder_) {
+ // Create base reader for direct decoder access
+ auto base_reader =
+
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
+ file_schema = base_reader->dataSchema();
+ base_reader_ = std::move(base_reader);
+ } else {
+ // Create DataFileReader<GenericDatum> for GenericDatum-based decoding
+ auto datum_reader =
std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
+ std::move(input_stream));
+ file_schema = datum_reader->dataSchema();
+ datum_reader_ = std::move(datum_reader);
+ }
// Validate field ids in the file schema.
HasIdVisitor has_id_visitor;
@@ -121,13 +135,22 @@ class AvroReader::Impl {
// TODO(gangwu): support pruning source fields
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_,
file_schema.root(),
/*prune_source=*/false));
- reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
- std::move(base_reader), file_schema);
- if (options.split) {
- reader_->sync(options.split->offset);
- split_end_ = options.split->offset + options.split->length;
+ if (use_direct_decoder_) {
+ // Initialize the base reader with the file schema
+ base_reader_->init(file_schema);
+ if (options.split) {
+ base_reader_->sync(options.split->offset);
+ split_end_ = options.split->offset + options.split->length;
+ }
+ } else {
+ // The datum reader is already initialized during construction
+ if (options.split) {
+ datum_reader_->sync(options.split->offset);
+ split_end_ = options.split->offset + options.split->length;
+ }
}
+
return {};
}
@@ -137,25 +160,37 @@ class AvroReader::Impl {
}
while (context_->builder_->length() < batch_size_) {
- if (split_end_ && reader_->pastSync(split_end_.value())) {
+ if (IsPastSync()) {
break;
}
- if (!reader_->read(*context_->datum_)) {
- break;
+
+ if (use_direct_decoder_) {
+ // Direct decoder: decode Avro to Arrow without GenericDatum
+ if (!base_reader_->hasMore()) {
+ break;
+ }
+ base_reader_->decr();
+
+ ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder(
+ GetReaderSchema().root(), base_reader_->decoder(), projection_,
*read_schema_,
+ context_->builder_.get(), &context_->decode_context_));
+ } else {
+ // GenericDatum-based decoding: decode via GenericDatum intermediate
+ if (!datum_reader_->read(*context_->datum_)) {
+ break;
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendDatumToBuilder(GetReaderSchema().root(), *context_->datum_,
projection_,
+ *read_schema_, context_->builder_.get()));
}
- ICEBERG_RETURN_UNEXPECTED(
- AppendDatumToBuilder(reader_->readerSchema().root(),
*context_->datum_,
- projection_, *read_schema_,
context_->builder_.get()));
}
return ConvertBuilderToArrowArray();
}
Status Close() {
- if (reader_ != nullptr) {
- reader_->close();
- reader_.reset();
- }
+ CloseReader();
context_.reset();
return {};
}
@@ -174,12 +209,12 @@ class AvroReader::Impl {
}
Result<std::unordered_map<std::string, std::string>> Metadata() {
- if (reader_ == nullptr) {
+ if ((use_direct_decoder_ && !base_reader_) ||
+ (!use_direct_decoder_ && !datum_reader_)) {
return Invalid("Reader is not opened");
}
- const auto& metadata = reader_->metadata();
-
+ const auto& metadata = GetReaderMetadata();
std::unordered_map<std::string, std::string> metadata_map;
metadata_map.reserve(metadata.size());
@@ -194,7 +229,6 @@ class AvroReader::Impl {
private:
Status InitReadContext() {
context_ = std::make_unique<ReadContext>();
- context_->datum_ =
std::make_unique<::avro::GenericDatum>(reader_->readerSchema());
ArrowSchema arrow_schema;
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*read_schema_, &arrow_schema));
@@ -214,6 +248,11 @@ class AvroReader::Impl {
}
context_->builder_ = builder_result.MoveValueUnsafe();
+ // Initialize GenericDatum for GenericDatum-based decoding
+ if (!use_direct_decoder_) {
+ context_->datum_ =
std::make_unique<::avro::GenericDatum>(GetReaderSchema());
+ }
+
return {};
}
@@ -238,17 +277,52 @@ class AvroReader::Impl {
return arrow_array;
}
+ bool IsPastSync() const {
+ if (!split_end_) {
+ return false;
+ }
+ return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value())
+ : datum_reader_->pastSync(split_end_.value());
+ }
+
+ const ::avro::Metadata& GetReaderMetadata() const {
+ return use_direct_decoder_ ? base_reader_->metadata() :
datum_reader_->metadata();
+ }
+
+ void CloseReader() {
+ if (use_direct_decoder_) {
+ if (base_reader_) {
+ base_reader_->close();
+ base_reader_.reset();
+ }
+ } else {
+ if (datum_reader_) {
+ datum_reader_->close();
+ datum_reader_.reset();
+ }
+ }
+ }
+
+ const ::avro::ValidSchema& GetReaderSchema() const {
+ return use_direct_decoder_ ? base_reader_->readerSchema()
+ : datum_reader_->readerSchema();
+ }
+
private:
// Max number of rows in the record batch to read.
int64_t batch_size_{};
+ // Whether to use direct decoder (true) or GenericDatum-based decoder
(false).
+ bool use_direct_decoder_{true};
// The end of the split to read and used to terminate the reading.
std::optional<int64_t> split_end_;
// The schema to read.
std::shared_ptr<::iceberg::Schema> read_schema_;
// The projection result to apply to the read schema.
SchemaProjection projection_;
- // The avro reader to read the data into a datum.
- std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
+ // The avro reader base - provides direct access to decoder for direct
decoding.
+ std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
+ // The datum reader for GenericDatum-based decoding.
+ std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
// The context to keep track of the reading progress.
std::unique_ptr<ReadContext> context_;
};
diff --git a/src/iceberg/avro/avro_scan.cc b/src/iceberg/avro/avro_scan.cc
new file mode 100644
index 00000000..3e690360
--- /dev/null
+++ b/src/iceberg/avro/avro_scan.cc
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <chrono>
+#include <iostream>
+#include <memory>
+#include <string>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/filesystem/localfs.h>
+#include <arrow/type.h>
+
+#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/schema.h"
+
+void PrintUsage(const char* program_name) {
+ std::cerr << "Usage: " << program_name << " [options] <avro_file>\n"
+ << "Options:\n"
+ << " --skip-datum=<true|false> Use direct decoder (default:
true)\n"
+ << " --batch-size=<N> Batch size for reading (default:
4096)\n"
+ << " --help Show this help message\n"
+ << "\nExample:\n"
+ << " " << program_name
+ << " --skip-datum=false --batch-size=1000 data.avro\n";
+}
+
+int main(int argc, char* argv[]) {
+ iceberg::avro::RegisterAll();
+
+ if (argc < 2) {
+ PrintUsage(argv[0]);
+ return 1;
+ }
+
+ std::string avro_file;
+ bool skip_datum = true;
+ int64_t batch_size = 4096;
+
+ // Parse arguments
+ for (int i = 1; i < argc; ++i) {
+ std::string arg = argv[i];
+ if (arg == "--help") {
+ PrintUsage(argv[0]);
+ return 0;
+ } else if (arg.starts_with("--skip-datum=")) {
+ std::string value = arg.substr(13);
+ if (value == "true" || value == "1") {
+ skip_datum = true;
+ } else if (value == "false" || value == "0") {
+ skip_datum = false;
+ } else {
+ std::cerr << "Invalid value for --skip-datum: " << value << "\n";
+ return 1;
+ }
+ } else if (arg.starts_with("--batch-size=")) {
+ batch_size = std::stoll(arg.substr(13));
+ if (batch_size <= 0) {
+ std::cerr << "Batch size must be positive\n";
+ return 1;
+ }
+ } else if (arg[0] == '-') {
+ std::cerr << "Unknown option: " << arg << "\n";
+ PrintUsage(argv[0]);
+ return 1;
+ } else {
+ avro_file = arg;
+ }
+ }
+
+ if (avro_file.empty()) {
+ std::cerr << "Error: No Avro file specified\n";
+ PrintUsage(argv[0]);
+ return 1;
+ }
+
+ std::cout << "Scanning Avro file: " << avro_file << "\n";
+ std::cout << "Skip datum: " << (skip_datum ? "true" : "false") << "\n";
+ std::cout << "Batch size: " << batch_size << "\n";
+ std::cout << std::string(60, '-') << "\n";
+
+ auto local_fs = std::make_shared<::arrow::fs::LocalFileSystem>();
+ auto file_io =
std::make_shared<iceberg::arrow::ArrowFileSystemFileIO>(local_fs);
+
+ // Get file info
+ auto file_info_result = local_fs->GetFileInfo(avro_file);
+ if (!file_info_result.ok()) {
+ std::cerr << "Error: Cannot access file: " <<
file_info_result.status().message()
+ << "\n";
+ return 1;
+ }
+ auto file_info = file_info_result.ValueOrDie();
+ if (file_info.type() != ::arrow::fs::FileType::File) {
+ std::cerr << "Error: Not a file: " << avro_file << "\n";
+ return 1;
+ }
+
+ std::cout << "File size: " << file_info.size() << " bytes\n";
+
+ // Configure reader properties
+ auto reader_properties = iceberg::ReaderProperties::default_properties();
+ reader_properties->Set(iceberg::ReaderProperties::kAvroSkipDatum,
skip_datum);
+ reader_properties->Set(iceberg::ReaderProperties::kBatchSize, batch_size);
+
+ // Open reader (without projection to read all columns)
+ auto reader_result = iceberg::ReaderFactoryRegistry::Open(
+ iceberg::FileFormatType::kAvro, {.path = avro_file,
+ .length = file_info.size(),
+ .io = file_io,
+ .projection = nullptr,
+ .properties =
std::move(reader_properties)});
+
+ if (!reader_result.has_value()) {
+ std::cerr << "Error opening reader: " << reader_result.error().message <<
"\n";
+ return 1;
+ }
+
+ auto reader = std::move(reader_result.value());
+
+ // Get schema
+ auto schema_result = reader->Schema();
+ if (!schema_result.has_value()) {
+ std::cerr << "Error getting schema: " << schema_result.error().message <<
"\n";
+ return 1;
+ }
+ auto arrow_schema = schema_result.value();
+ auto arrow_schema_import = ::arrow::ImportType(&arrow_schema);
+ if (!arrow_schema_import.ok()) {
+ std::cerr << "Error importing schema: " <<
arrow_schema_import.status().message()
+ << "\n";
+ return 1;
+ }
+ std::cout << "Schema: " << arrow_schema_import.ValueOrDie()->ToString() <<
"\n";
+ std::cout << std::string(60, '-') << "\n";
+
+ // Scan file and measure time
+ auto start = std::chrono::high_resolution_clock::now();
+
+ int64_t total_rows = 0;
+ int64_t batch_count = 0;
+
+ while (true) {
+ auto batch_result = reader->Next();
+ if (!batch_result.has_value()) {
+ std::cerr << "Error reading batch: " << batch_result.error().message <<
"\n";
+ return 1;
+ }
+
+ auto batch_opt = batch_result.value();
+ if (!batch_opt.has_value()) {
+ // End of file
+ break;
+ }
+
+ auto arrow_array = batch_opt.value();
+ auto arrow_type = arrow_schema_import.ValueOrDie();
+ auto array_import = ::arrow::ImportArray(&arrow_array, arrow_type);
+ if (!array_import.ok()) {
+ std::cerr << "Error importing array: " <<
array_import.status().message() << "\n";
+ return 1;
+ }
+
+ int64_t batch_rows = array_import.ValueOrDie()->length();
+ total_rows += batch_rows;
+ batch_count++;
+ }
+
+ auto end = std::chrono::high_resolution_clock::now();
+ auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end -
start);
+
+ // Print results
+ std::cout << "\nResults:\n";
+ std::cout << " Total rows: " << total_rows << "\n";
+ std::cout << " Batches: " << batch_count << "\n";
+ std::cout << " Time: " << duration.count() << " ms\n";
+ std::cout << " Throughput: "
+ << (duration.count() > 0 ? (total_rows * 1000 / duration.count())
: 0)
+ << " rows/sec\n";
+ std::cout << " Throughput: "
+ << (duration.count() > 0
+ ? (file_info.size() / 1024.0 / 1024.0) / (duration.count()
/ 1000.0)
+ : 0)
+ << " MB/sec\n";
+
+ return 0;
+}
diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h
index 85221a6a..a5af0a41 100644
--- a/src/iceberg/file_reader.h
+++ b/src/iceberg/file_reader.h
@@ -76,6 +76,11 @@ class ReaderProperties : public ConfigBase<ReaderProperties>
{
/// \brief The batch size to read.
inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
+ /// \brief Skip GenericDatum in Avro reader for better performance.
+ /// When true, decode directly from Avro to Arrow without GenericDatum
intermediate.
+ /// Default: true (skip GenericDatum for better performance).
+ inline static Entry<bool> kAvroSkipDatum{"read.avro.skip-datum", true};
+
/// \brief Create a default ReaderProperties instance.
static std::unique_ptr<ReaderProperties> default_properties();
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 1d421ede..c1bb8bc9 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -17,6 +17,8 @@
* under the License.
*/
+#include <sstream>
+
#include <arrow/array/array_base.h>
#include <arrow/c/bridge.h>
#include <arrow/filesystem/localfs.h>
@@ -51,6 +53,8 @@ class AvroReaderTest : public TempFileTestBase {
temp_avro_file_ = CreateNewTempFilePathWithSuffix(".avro");
}
+ bool skip_datum_{true};
+
void CreateSimpleAvroFile() {
const std::string avro_schema_json = R"({
"type": "record",
@@ -139,11 +143,15 @@ class AvroReaderTest : public TempFileTestBase {
ASSERT_TRUE(file_info_result.ok());
ASSERT_EQ(file_info_result->size(), writer->length().value());
- auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro,
- {.path = temp_avro_file_,
- .length =
file_info_result->size(),
- .io = file_io_,
- .projection = schema});
+ auto reader_properties = ReaderProperties::default_properties();
+ reader_properties->Set(ReaderProperties::kAvroSkipDatum, skip_datum_);
+
+ auto reader_result = ReaderFactoryRegistry::Open(
+ FileFormatType::kAvro, {.path = temp_avro_file_,
+ .length = file_info_result->size(),
+ .io = file_io_,
+ .projection = schema,
+ .properties = std::move(reader_properties)});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
@@ -164,6 +172,16 @@ class AvroReaderTest : public TempFileTestBase {
std::string temp_avro_file_;
};
+// Parameterized test fixture for testing both DirectDecoder and GenericDatum
modes
+class AvroReaderParameterizedTest : public AvroReaderTest,
+ public ::testing::WithParamInterface<bool>
{
+ protected:
+ void SetUp() override {
+ AvroReaderTest::SetUp();
+ skip_datum_ = GetParam();
+ }
+};
+
TEST_F(AvroReaderTest, ReadTwoFields) {
CreateSimpleAvroFile();
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
@@ -220,7 +238,7 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
}
-TEST_F(AvroReaderTest, AvroWriterBasicType) {
+TEST_P(AvroReaderParameterizedTest, AvroWriterBasicType) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "name", std::make_shared<StringType>())});
@@ -229,7 +247,7 @@ TEST_F(AvroReaderTest, AvroWriterBasicType) {
WriteAndVerify(schema, expected_string);
}
-TEST_F(AvroReaderTest, AvroWriterNestedType) {
+TEST_P(AvroReaderParameterizedTest, AvroWriterNestedType) {
auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
SchemaField::MakeRequired(
@@ -244,4 +262,245 @@ TEST_F(AvroReaderTest, AvroWriterNestedType) {
WriteAndVerify(schema, expected_string);
}
+TEST_P(AvroReaderParameterizedTest, AllPrimitiveTypes) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "bool_col",
std::make_shared<BooleanType>()),
+ SchemaField::MakeRequired(2, "int_col", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(3, "long_col", std::make_shared<LongType>()),
+ SchemaField::MakeRequired(4, "float_col", std::make_shared<FloatType>()),
+ SchemaField::MakeRequired(5, "double_col",
std::make_shared<DoubleType>()),
+ SchemaField::MakeRequired(6, "string_col",
std::make_shared<StringType>()),
+ SchemaField::MakeRequired(7, "binary_col",
std::make_shared<BinaryType>())});
+
+ std::string expected_string = R"([
+ [true, 42, 1234567890, 3.14, 2.71828, "test", "AQID"],
+ [false, -100, -9876543210, -1.5, 0.0, "hello", "BAUG"]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+// Skipping DecimalType test - requires specific decimal encoding in JSON
+
+TEST_P(AvroReaderParameterizedTest, DateTimeTypes) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "date_col", std::make_shared<DateType>()),
+ SchemaField::MakeRequired(2, "time_col", std::make_shared<TimeType>()),
+ SchemaField::MakeRequired(3, "timestamp_col",
std::make_shared<TimestampType>())});
+
+ // Dates as days since epoch, time/timestamps as microseconds
+ std::string expected_string = R"([
+ [18628, 43200000000, 1640995200000000],
+ [18629, 86399000000, 1641081599000000]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, NestedStruct) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(
+ 2, "person",
+ std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(3, "name",
std::make_shared<StringType>()),
+ SchemaField::MakeRequired(4, "age", std::make_shared<IntType>()),
+ SchemaField::MakeOptional(
+ 5, "address",
+
std::make_shared<iceberg::StructType>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(6, "street",
+
std::make_shared<StringType>()),
+ SchemaField::MakeRequired(7, "city",
+
std::make_shared<StringType>())}))}))});
+
+ std::string expected_string = R"([
+ [1, ["Alice", 30, ["123 Main St", "NYC"]]],
+ [2, ["Bob", 25, ["456 Oak Ave", "LA"]]]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, ListType) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(2, "tags",
+
std::make_shared<ListType>(SchemaField::MakeRequired(
+ 3, "element",
std::make_shared<StringType>())))});
+
+ std::string expected_string = R"([
+ [1, ["tag1", "tag2", "tag3"]],
+ [2, ["foo", "bar"]],
+ [3, []]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, MapType) {
+ auto schema = std::make_shared<iceberg::Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(
+ 1, "properties",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(2, "key",
std::make_shared<StringType>()),
+ SchemaField::MakeRequired(3, "value",
std::make_shared<IntType>())))});
+
+ std::string expected_string = R"([
+ [[["key1", 100], ["key2", 200]]],
+ [[["a", 1], ["b", 2], ["c", 3]]]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, MapTypeWithNonStringKey) {
+ auto schema = std::make_shared<iceberg::Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(
+ 1, "int_map",
+ std::make_shared<MapType>(
+ SchemaField::MakeRequired(2, "key", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(3, "value",
std::make_shared<StringType>())))});
+
+ std::string expected_string = R"([
+ [[[1, "one"], [2, "two"], [3, "three"]]],
+ [[[10, "ten"], [20, "twenty"]]]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+TEST_F(AvroReaderTest, ProjectionSubsetAndReorder) {
+ // Write file with full schema
+ auto write_schema =
std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(2, "name", std::make_shared<StringType>()),
+ SchemaField::MakeRequired(3, "age", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(4, "city", std::make_shared<StringType>())});
+
+ std::string write_data = R"([
+ [1, "Alice", 25, "NYC"],
+ [2, "Bob", 30, "SF"],
+ [3, "Charlie", 35, "LA"]
+ ])";
+
+ // Write with full schema
+ ArrowSchema arrow_c_schema;
+ ASSERT_THAT(ToArrowSchema(*write_schema, &arrow_c_schema), IsOk());
+ auto arrow_schema_result = ::arrow::ImportType(&arrow_c_schema);
+ ASSERT_TRUE(arrow_schema_result.ok());
+ auto arrow_schema = arrow_schema_result.ValueOrDie();
+
+ auto array_result = ::arrow::json::ArrayFromJSONString(arrow_schema,
write_data);
+ ASSERT_TRUE(array_result.ok());
+ auto array = array_result.ValueOrDie();
+
+ struct ArrowArray arrow_array;
+ auto export_result = ::arrow::ExportArray(*array, &arrow_array);
+ ASSERT_TRUE(export_result.ok());
+
+ std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"}};
+ auto writer_result =
+ WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path =
temp_avro_file_,
+ .schema =
write_schema,
+ .io = file_io_,
+ .metadata =
metadata});
+ ASSERT_TRUE(writer_result.has_value());
+ auto writer = std::move(writer_result.value());
+ ASSERT_THAT(writer->Write(&arrow_array), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+
+ // Read with projected schema: subset of columns (city, id) in different
order
+ auto read_schema =
std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(4, "city", std::make_shared<StringType>()),
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>())});
+
+ auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
+ ASSERT_TRUE(file_info_result.ok());
+
+ auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kAvro,
+ {.path = temp_avro_file_,
+ .length =
file_info_result->size(),
+ .io = file_io_,
+ .projection =
read_schema});
+ ASSERT_THAT(reader_result, IsOk());
+ auto reader = std::move(reader_result.value());
+
+ // Verify reordered subset
+ ASSERT_NO_FATAL_FAILURE(
+ VerifyNextBatch(*reader, R"([["NYC", 1], ["SF", 2], ["LA", 3]])"));
+ ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
+}
+
+TEST_P(AvroReaderParameterizedTest, ComplexNestedTypes) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(2, "nested_list",
+
std::make_shared<ListType>(SchemaField::MakeRequired(
+ 3, "element",
+
std::make_shared<ListType>(SchemaField::MakeRequired(
+ 4, "element",
std::make_shared<IntType>())))))});
+
+ std::string expected_string = R"([
+ [1, [[1, 2], [3, 4]]],
+ [2, [[5], [6, 7, 8]]]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+TEST_P(AvroReaderParameterizedTest, OptionalFieldsWithNulls) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeOptional(2, "name", std::make_shared<StringType>()),
+ SchemaField::MakeOptional(3, "age", std::make_shared<IntType>())});
+
+ std::string expected_string = R"([
+ [1, "Alice", 30],
+ [2, null, 25],
+ [3, "Charlie", null],
+ [4, null, null]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+// Test both direct decoder and GenericDatum paths
+TEST_P(AvroReaderParameterizedTest, LargeDataset) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<LongType>()),
+ SchemaField::MakeRequired(2, "value", std::make_shared<DoubleType>())});
+
+ // Generate large dataset JSON
+ std::ostringstream json;
+ json << "[";
+ for (int i = 0; i < 1000; ++i) {
+ if (i > 0) json << ", ";
+ json << "[" << i << ", " << (i * 1.5) << "]";
+ }
+ json << "]";
+
+ WriteAndVerify(schema, json.str());
+}
+
+TEST_P(AvroReaderParameterizedTest, EmptyCollections) {
+ auto schema = std::make_shared<iceberg::Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", std::make_shared<IntType>()),
+ SchemaField::MakeRequired(2, "list_col",
+
std::make_shared<ListType>(SchemaField::MakeRequired(
+ 3, "element",
std::make_shared<IntType>())))});
+
+ std::string expected_string = R"([
+ [1, []],
+ [2, [10, 20, 30]]
+ ])";
+
+ WriteAndVerify(schema, expected_string);
+}
+
+INSTANTIATE_TEST_SUITE_P(DirectDecoderModes, AvroReaderParameterizedTest,
+ ::testing::Bool(),
+ [](const ::testing::TestParamInfo<bool>& info) {
+ return info.param ? "DirectDecoder" :
"GenericDatum";
+ });
+
} // namespace iceberg::avro
diff --git a/src/iceberg/test/temp_file_test_base.h
b/src/iceberg/test/temp_file_test_base.h
index 8e20e2ca..4b3131d1 100644
--- a/src/iceberg/test/temp_file_test_base.h
+++ b/src/iceberg/test/temp_file_test_base.h
@@ -118,7 +118,14 @@ class TempFileTestBase : public ::testing::Test {
/// \brief Get the test name for inclusion in the filename
std::string TestInfo() const {
if (const auto info =
::testing::UnitTest::GetInstance()->current_test_info(); info) {
- return std::format("{}_{}", info->test_suite_name(), info->name());
+ std::string result = std::format("{}_{}", info->test_suite_name(),
info->name());
+ // Replace slashes (from parameterized tests) with underscores to avoid
path issues
+ for (auto& c : result) {
+ if (c == '/') {
+ c = '_';
+ }
+ }
+ return result;
}
return "unknown_test";
}