This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 95045a80 refactor(avro): use polymorphism for reader/writer backends
(#458)
95045a80 is described below
commit 95045a809e54d1b96f27853203b2de3e42d44f4f
Author: Gang Wu <[email protected]>
AuthorDate: Tue Dec 30 19:33:42 2025 +0800
refactor(avro): use polymorphism for reader/writer backends (#458)
Refactor AvroReader and AvroWriter to use the strategy pattern for
handling different encoding/decoding backends. This replaces the
previous if/else logic with dedicated backend implementations (Direct vs
GenericDatum), improving code structure and maintainability.
---
src/iceberg/avro/avro_reader.cc | 250 +++++++++++++++++++++++++---------------
src/iceberg/avro/avro_writer.cc | 165 ++++++++++++++++----------
2 files changed, 262 insertions(+), 153 deletions(-)
diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index 964f6d1d..aff90625 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -59,7 +59,138 @@ Result<std::unique_ptr<AvroInputStream>>
CreateInputStream(const ReaderOptions&
return std::make_unique<AvroInputStream>(file, buffer_size);
}
-} // namespace
+// Abstract base class for Avro read backends.
+class AvroReadBackend {
+ public:
+ virtual ~AvroReadBackend() = default;
+ virtual Result<::avro::ValidSchema> Init(
+ std::unique_ptr<AvroInputStream> input_stream) = 0;
+ virtual Status InitWithSchema(const ::avro::ValidSchema& file_schema,
+ const std::optional<Split>& split) = 0;
+ virtual void InitReadContext(const ::avro::ValidSchema& reader_schema) = 0;
+ virtual bool HasMore() = 0;
+ virtual Status DecodeNext(const SchemaProjection& projection, const Schema&
read_schema,
+ ::arrow::ArrayBuilder* builder) = 0;
+ virtual bool IsPastSync(int64_t split_end) const = 0;
+ virtual const ::avro::Metadata& GetMetadata() const = 0;
+ virtual const ::avro::ValidSchema& GetReaderSchema() const = 0;
+ virtual void Close() = 0;
+ virtual bool Closed() const = 0;
+};
+
+// Backend implementation using direct Avro decoder.
+class DirectDecoderBackend : public AvroReadBackend {
+ public:
+ Result<::avro::ValidSchema> Init(
+ std::unique_ptr<AvroInputStream> input_stream) override {
+ reader_ =
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
+ return reader_->dataSchema();
+ }
+
+ Status InitWithSchema(const ::avro::ValidSchema& file_schema,
+ const std::optional<Split>& split) override {
+ reader_->init(file_schema);
+ if (split) {
+ reader_->sync(split->offset);
+ }
+ return {};
+ }
+
+ void InitReadContext(const ::avro::ValidSchema&) override {}
+
+ bool HasMore() override { return reader_->hasMore(); }
+
+ Status DecodeNext(const SchemaProjection& projection, const Schema&
read_schema,
+ ::arrow::ArrayBuilder* builder) override {
+ reader_->decr();
+ return DecodeAvroToBuilder(GetReaderSchema().root(), reader_->decoder(),
projection,
+ read_schema, builder, decode_context_);
+ }
+
+ bool IsPastSync(int64_t split_end) const override {
+ return reader_->pastSync(split_end);
+ }
+
+ const ::avro::Metadata& GetMetadata() const override { return
reader_->metadata(); }
+
+ const ::avro::ValidSchema& GetReaderSchema() const override {
+ return reader_->readerSchema();
+ }
+
+ void Close() override {
+ if (reader_) {
+ reader_->close();
+ reader_.reset();
+ }
+ }
+
+ bool Closed() const override { return reader_ == nullptr; }
+
+ private:
+ std::unique_ptr<::avro::DataFileReaderBase> reader_;
+ // Decode context for reusing scratch buffers
+ DecodeContext decode_context_;
+};
+
+// Backend implementation using avro::GenericDatum.
+class GenericDatumBackend : public AvroReadBackend {
+ public:
+ Result<::avro::ValidSchema> Init(
+ std::unique_ptr<AvroInputStream> input_stream) override {
+ reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
+ std::move(input_stream));
+ return reader_->dataSchema();
+ }
+
+ Status InitWithSchema(const ::avro::ValidSchema& /*file_schema*/,
+ const std::optional<Split>& split) override {
+ if (split) {
+ reader_->sync(split->offset);
+ }
+ return {};
+ }
+
+ void InitReadContext(const ::avro::ValidSchema& reader_schema) override {
+ datum_ = std::make_unique<::avro::GenericDatum>(reader_schema);
+ }
+
+ bool HasMore() override {
+ has_more_ = reader_->read(*datum_);
+ return has_more_;
+ }
+
+ Status DecodeNext(const SchemaProjection& projection, const Schema&
read_schema,
+ ::arrow::ArrayBuilder* builder) override {
+ return AppendDatumToBuilder(GetReaderSchema().root(), *datum_, projection,
+ read_schema, builder);
+ }
+
+ bool IsPastSync(int64_t split_end) const override {
+ return reader_->pastSync(split_end);
+ }
+
+ const ::avro::Metadata& GetMetadata() const override { return
reader_->metadata(); }
+
+ const ::avro::ValidSchema& GetReaderSchema() const override {
+ return reader_->readerSchema();
+ }
+
+ void Close() override {
+ if (reader_) {
+ reader_->close();
+ reader_.reset();
+ }
+ }
+
+ bool Closed() const override { return reader_ == nullptr; }
+
+ private:
+ std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
+ // Reusable GenericDatum for reading records
+ std::unique_ptr<::avro::GenericDatum> datum_;
+ // Cached result from HasMore()
+ bool has_more_ = false;
+};
// A stateful context to keep track of the reading progress.
struct ReadContext {
@@ -67,14 +198,10 @@ struct ReadContext {
std::shared_ptr<::arrow::Schema> arrow_schema_;
// The builder to build the record batch.
std::shared_ptr<::arrow::ArrayBuilder> builder_;
- // GenericDatum for GenericDatum-based decoding (only used if direct decoder
is
- // disabled)
- std::unique_ptr<::avro::GenericDatum> datum_;
- // Decode context for reusing scratch buffers (only used if direct decoder is
- // enabled)
- DecodeContext decode_context_;
};
+} // namespace
+
// TODO(gang.wu): collect basic reader metrics
class AvroReader::Impl {
public:
@@ -85,7 +212,6 @@ class AvroReader::Impl {
}
batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
- use_direct_decoder_ =
options.properties->Get(ReaderProperties::kAvroSkipDatum);
read_schema_ = options.projection;
// Open the input stream and adapt to the avro interface.
@@ -94,22 +220,15 @@ class AvroReader::Impl {
CreateInputStream(options,
options.properties->Get(ReaderProperties::kAvroBufferSize)));
- ::avro::ValidSchema file_schema;
-
- if (use_direct_decoder_) {
- // Create base reader for direct decoder access
- auto base_reader =
-
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
- file_schema = base_reader->dataSchema();
- base_reader_ = std::move(base_reader);
+ // Create the appropriate backend based on configuration
+ if (options.properties->Get(ReaderProperties::kAvroSkipDatum)) {
+ backend_ = std::make_unique<DirectDecoderBackend>();
} else {
- // Create DataFileReader<GenericDatum> for GenericDatum-based decoding
- auto datum_reader =
std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
- std::move(input_stream));
- file_schema = datum_reader->dataSchema();
- datum_reader_ = std::move(datum_reader);
+ backend_ = std::make_unique<GenericDatumBackend>();
}
+ ICEBERG_ASSIGN_OR_RAISE(auto file_schema,
backend_->Init(std::move(input_stream)));
+
// Validate field ids in the file schema.
HasIdVisitor has_id_visitor;
ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
@@ -132,23 +251,13 @@ class AvroReader::Impl {
}
// Project the read schema on top of the file schema.
- // TODO(gangwu): support pruning source fields
ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_,
file_schema.root(),
/*prune_source=*/false));
- if (use_direct_decoder_) {
- // Initialize the base reader with the file schema
- base_reader_->init(file_schema);
- if (options.split) {
- base_reader_->sync(options.split->offset);
- split_end_ = options.split->offset + options.split->length;
- }
- } else {
- // The datum reader is already initialized during construction
- if (options.split) {
- datum_reader_->sync(options.split->offset);
- split_end_ = options.split->offset + options.split->length;
- }
+ ICEBERG_RETURN_UNEXPECTED(backend_->InitWithSchema(file_schema,
options.split));
+
+ if (options.split) {
+ split_end_ = options.split->offset + options.split->length;
}
return {};
@@ -163,34 +272,18 @@ class AvroReader::Impl {
if (IsPastSync()) {
break;
}
-
- if (use_direct_decoder_) {
- // Direct decoder: decode Avro to Arrow without GenericDatum
- if (!base_reader_->hasMore()) {
- break;
- }
- base_reader_->decr();
-
- ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder(
- GetReaderSchema().root(), base_reader_->decoder(), projection_,
*read_schema_,
- context_->builder_.get(), context_->decode_context_));
- } else {
- // GenericDatum-based decoding: decode via GenericDatum intermediate
- if (!datum_reader_->read(*context_->datum_)) {
- break;
- }
-
- ICEBERG_RETURN_UNEXPECTED(
- AppendDatumToBuilder(GetReaderSchema().root(), *context_->datum_,
projection_,
- *read_schema_, context_->builder_.get()));
+ if (!backend_->HasMore()) {
+ break;
}
+ ICEBERG_RETURN_UNEXPECTED(
+ backend_->DecodeNext(projection_, *read_schema_,
context_->builder_.get()));
}
return ConvertBuilderToArrowArray();
}
Status Close() {
- CloseReader();
+ backend_->Close();
context_.reset();
return {};
}
@@ -209,12 +302,11 @@ class AvroReader::Impl {
}
Result<std::unordered_map<std::string, std::string>> Metadata() {
- if ((use_direct_decoder_ && !base_reader_) ||
- (!use_direct_decoder_ && !datum_reader_)) {
+ if (backend_->Closed()) {
return Invalid("Reader is not opened");
}
- const auto& metadata = GetReaderMetadata();
+ const auto& metadata = backend_->GetMetadata();
std::unordered_map<std::string, std::string> metadata_map;
metadata_map.reserve(metadata.size());
@@ -247,11 +339,7 @@ class AvroReader::Impl {
builder_result.status().message());
}
context_->builder_ = builder_result.MoveValueUnsafe();
-
- // Initialize GenericDatum for GenericDatum-based decoding
- if (!use_direct_decoder_) {
- context_->datum_ =
std::make_unique<::avro::GenericDatum>(GetReaderSchema());
- }
+ backend_->InitReadContext(backend_->GetReaderSchema());
return {};
}
@@ -281,48 +369,20 @@ class AvroReader::Impl {
if (!split_end_) {
return false;
}
- return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value())
- : datum_reader_->pastSync(split_end_.value());
- }
-
- const ::avro::Metadata& GetReaderMetadata() const {
- return use_direct_decoder_ ? base_reader_->metadata() :
datum_reader_->metadata();
- }
-
- void CloseReader() {
- if (use_direct_decoder_) {
- if (base_reader_) {
- base_reader_->close();
- base_reader_.reset();
- }
- } else {
- if (datum_reader_) {
- datum_reader_->close();
- datum_reader_.reset();
- }
- }
- }
-
- const ::avro::ValidSchema& GetReaderSchema() const {
- return use_direct_decoder_ ? base_reader_->readerSchema()
- : datum_reader_->readerSchema();
+ return backend_->IsPastSync(split_end_.value());
}
private:
// Max number of rows in the record batch to read.
int64_t batch_size_{};
- // Whether to use direct decoder (true) or GenericDatum-based decoder
(false).
- bool use_direct_decoder_{true};
// The end of the split to read and used to terminate the reading.
std::optional<int64_t> split_end_;
// The schema to read.
std::shared_ptr<::iceberg::Schema> read_schema_;
// The projection result to apply to the read schema.
SchemaProjection projection_;
- // The avro reader base - provides direct access to decoder for direct
decoding.
- std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
- // The datum reader for GenericDatum-based decoding.
- std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
+ // The read backend to read data into Arrow.
+ std::unique_ptr<AvroReadBackend> backend_;
// The context to keep track of the reading progress.
std::unique_ptr<ReadContext> context_;
};
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 0c640231..b426a756 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -52,6 +52,95 @@ Result<std::unique_ptr<AvroOutputStream>>
CreateOutputStream(const WriterOptions
return std::make_unique<AvroOutputStream>(output, buffer_size);
}
+// Abstract base class for Avro write backends.
+class AvroWriteBackend {
+ public:
+ virtual ~AvroWriteBackend() = default;
+ virtual Status Init(std::unique_ptr<AvroOutputStream> output_stream,
+ const ::avro::ValidSchema& avro_schema, int64_t
sync_interval,
+ const std::map<std::string, std::vector<uint8_t>>&
metadata) = 0;
+ virtual Status WriteRow(const Schema& write_schema, const ::arrow::Array&
array,
+ int64_t row_index) = 0;
+ virtual void Close() = 0;
+ virtual bool Closed() const = 0;
+};
+
+// Backend implementation using direct Avro encoder.
+class DirectEncoderBackend : public AvroWriteBackend {
+ public:
+ Status Init(std::unique_ptr<AvroOutputStream> output_stream,
+ const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
+ const std::map<std::string, std::vector<uint8_t>>& metadata)
override {
+ writer_ =
std::make_unique<::avro::DataFileWriterBase>(std::move(output_stream),
+ avro_schema,
sync_interval,
+ ::avro::NULL_CODEC,
metadata);
+ avro_root_node_ = avro_schema.root();
+ return {};
+ }
+
+ Status WriteRow(const Schema& write_schema, const ::arrow::Array& array,
+ int64_t row_index) override {
+ ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(avro_root_node_,
writer_->encoder(),
+ write_schema, array, row_index,
+ encode_ctx_));
+ writer_->incr();
+ return {};
+ }
+
+ void Close() override {
+ if (writer_) {
+ writer_->close();
+ writer_.reset();
+ }
+ }
+
+ bool Closed() const override { return writer_ == nullptr; }
+
+ private:
+ // Root node of the Avro schema
+ ::avro::NodePtr avro_root_node_;
+ // The avro writer using direct encoder
+ std::unique_ptr<::avro::DataFileWriterBase> writer_;
+ // Encode context for reusing scratch buffers
+ EncodeContext encode_ctx_;
+};
+
+// Backend implementation using avro::GenericDatum as the intermediate
representation.
+class GenericDatumBackend : public AvroWriteBackend {
+ public:
+ Status Init(std::unique_ptr<AvroOutputStream> output_stream,
+ const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
+ const std::map<std::string, std::vector<uint8_t>>& metadata)
override {
+ writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
+ std::move(output_stream), avro_schema, sync_interval,
::avro::NULL_CODEC,
+ metadata);
+ datum_ = std::make_unique<::avro::GenericDatum>(avro_schema);
+ return {};
+ }
+
+ Status WriteRow(const Schema& /*write_schema*/, const ::arrow::Array& array,
+ int64_t row_index) override {
+ ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(array, row_index,
datum_.get()));
+ writer_->write(*datum_);
+ return {};
+ }
+
+ void Close() override {
+ if (writer_) {
+ writer_->close();
+ writer_.reset();
+ }
+ }
+
+ bool Closed() const override { return writer_ == nullptr; }
+
+ private:
+ // The avro writer to write the data into a datum
+ std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
+ // Reusable Avro datum for writing individual records
+ std::unique_ptr<::avro::GenericDatum> datum_;
+};
+
} // namespace
class AvroWriter::Impl {
@@ -64,7 +153,6 @@ class AvroWriter::Impl {
Status Open(const WriterOptions& options) {
write_schema_ = options.schema;
- use_direct_encoder_ =
options.properties->Get(WriterProperties::kAvroSkipDatum);
::avro::NodePtr root;
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_,
&root));
@@ -82,6 +170,7 @@ class AvroWriter::Impl {
CreateOutputStream(options,
options.properties->Get(WriterProperties::kAvroBufferSize)));
arrow_output_stream_ = output_stream->arrow_output_stream();
+
std::map<std::string, std::vector<uint8_t>> metadata;
for (const auto& [key, value] : options.metadata) {
std::vector<uint8_t> vec;
@@ -90,22 +179,17 @@ class AvroWriter::Impl {
metadata.emplace(key, std::move(vec));
}
- if (use_direct_encoder_) {
- // Skip avro::GenericDatum by using encoder provided by
DataFileWriterBase.
- writer_base_ = std::make_unique<::avro::DataFileWriterBase>(
- std::move(output_stream), *avro_schema_,
- options.properties->Get(WriterProperties::kAvroSyncInterval),
- ::avro::NULL_CODEC /*codec*/, metadata);
- avro_root_node_ = avro_schema_->root();
+ // Create the appropriate backend based on configuration
+ if (options.properties->Get(WriterProperties::kAvroSkipDatum)) {
+ backend_ = std::make_unique<DirectEncoderBackend>();
} else {
- // Everything via avro::GenericDatum.
- writer_datum_ =
std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
- std::move(output_stream), *avro_schema_,
- options.properties->Get(WriterProperties::kAvroSyncInterval),
- ::avro::NULL_CODEC /*codec*/, metadata);
- datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
+ backend_ = std::make_unique<GenericDatumBackend>();
}
+ ICEBERG_RETURN_UNEXPECTED(backend_->Init(
+ std::move(output_stream), *avro_schema_,
+ options.properties->Get(WriterProperties::kAvroSyncInterval),
metadata));
+
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
return {};
}
@@ -114,45 +198,23 @@ class AvroWriter::Impl {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
::arrow::ImportArray(data, &arrow_schema_));
- if (use_direct_encoder_) {
- for (int64_t i = 0; i < result->length(); i++) {
- ICEBERG_RETURN_UNEXPECTED(
- EncodeArrowToAvro(avro_root_node_, writer_base_->encoder(),
*write_schema_,
- *result, i, encode_ctx_));
- writer_base_->incr();
- }
- } else {
- for (int64_t i = 0; i < result->length(); i++) {
- ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i,
datum_.get()));
- writer_datum_->write(*datum_);
- }
+ for (int64_t i = 0; i < result->length(); i++) {
+ ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result,
i));
}
return {};
}
Status Close() {
- if (use_direct_encoder_) {
- if (writer_base_ != nullptr) {
- writer_base_->close();
- writer_base_.reset();
- ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_,
arrow_output_stream_->Tell());
- ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
- }
- } else {
- if (writer_datum_ != nullptr) {
- writer_datum_->close();
- writer_datum_.reset();
- ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_,
arrow_output_stream_->Tell());
- ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
- }
+ if (!backend_->Closed()) {
+ backend_->Close();
+ ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_,
arrow_output_stream_->Tell());
+ ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
}
return {};
}
- bool Closed() const {
- return use_direct_encoder_ ? writer_base_ == nullptr : writer_datum_ ==
nullptr;
- }
+ bool Closed() const { return backend_->Closed(); }
Result<int64_t> length() {
if (Closed()) {
@@ -174,21 +236,8 @@ class AvroWriter::Impl {
ArrowSchema arrow_schema_;
// Total length of the written Avro file.
int64_t total_bytes_ = 0;
-
- // Flag to determine which encoder to use
- bool use_direct_encoder_ = true;
-
- // [Encoder path] Root node of the Avro schema
- ::avro::NodePtr avro_root_node_;
- // [Encoder path] The avro writer using direct encoder
- std::unique_ptr<::avro::DataFileWriterBase> writer_base_;
- // [Encoder path] Encode context for reusing scratch buffers
- EncodeContext encode_ctx_;
-
- // [GenericDatum path] The avro writer to write the data into a datum
- std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_datum_;
- // [GenericDatum path] Reusable Avro datum for writing individual records
- std::unique_ptr<::avro::GenericDatum> datum_;
+ // The write backend to write data.
+ std::unique_ptr<AvroWriteBackend> backend_;
};
AvroWriter::~AvroWriter() = default;