This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 95045a80 refactor(avro): use polymorphism for reader/writer backends 
(#458)
95045a80 is described below

commit 95045a809e54d1b96f27853203b2de3e42d44f4f
Author: Gang Wu <[email protected]>
AuthorDate: Tue Dec 30 19:33:42 2025 +0800

    refactor(avro): use polymorphism for reader/writer backends (#458)
    
    Refactor AvroReader and AvroWriter to use the strategy pattern for
    handling different encoding/decoding backends. This replaces the
    previous if/else logic with dedicated backend implementations (Direct vs
    GenericDatum), improving code structure and maintainability.
---
 src/iceberg/avro/avro_reader.cc | 250 +++++++++++++++++++++++++---------------
 src/iceberg/avro/avro_writer.cc | 165 ++++++++++++++++----------
 2 files changed, 262 insertions(+), 153 deletions(-)

diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index 964f6d1d..aff90625 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -59,7 +59,138 @@ Result<std::unique_ptr<AvroInputStream>> 
CreateInputStream(const ReaderOptions&
   return std::make_unique<AvroInputStream>(file, buffer_size);
 }
 
-}  // namespace
+// Abstract base class for Avro read backends.
+class AvroReadBackend {
+ public:
+  virtual ~AvroReadBackend() = default;
+  virtual Result<::avro::ValidSchema> Init(
+      std::unique_ptr<AvroInputStream> input_stream) = 0;
+  virtual Status InitWithSchema(const ::avro::ValidSchema& file_schema,
+                                const std::optional<Split>& split) = 0;
+  virtual void InitReadContext(const ::avro::ValidSchema& reader_schema) = 0;
+  virtual bool HasMore() = 0;
+  virtual Status DecodeNext(const SchemaProjection& projection, const Schema& 
read_schema,
+                            ::arrow::ArrayBuilder* builder) = 0;
+  virtual bool IsPastSync(int64_t split_end) const = 0;
+  virtual const ::avro::Metadata& GetMetadata() const = 0;
+  virtual const ::avro::ValidSchema& GetReaderSchema() const = 0;
+  virtual void Close() = 0;
+  virtual bool Closed() const = 0;
+};
+
+// Backend implementation using direct Avro decoder.
+class DirectDecoderBackend : public AvroReadBackend {
+ public:
+  Result<::avro::ValidSchema> Init(
+      std::unique_ptr<AvroInputStream> input_stream) override {
+    reader_ = 
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
+    return reader_->dataSchema();
+  }
+
+  Status InitWithSchema(const ::avro::ValidSchema& file_schema,
+                        const std::optional<Split>& split) override {
+    reader_->init(file_schema);
+    if (split) {
+      reader_->sync(split->offset);
+    }
+    return {};
+  }
+
+  void InitReadContext(const ::avro::ValidSchema&) override {}
+
+  bool HasMore() override { return reader_->hasMore(); }
+
+  Status DecodeNext(const SchemaProjection& projection, const Schema& 
read_schema,
+                    ::arrow::ArrayBuilder* builder) override {
+    reader_->decr();
+    return DecodeAvroToBuilder(GetReaderSchema().root(), reader_->decoder(), 
projection,
+                               read_schema, builder, decode_context_);
+  }
+
+  bool IsPastSync(int64_t split_end) const override {
+    return reader_->pastSync(split_end);
+  }
+
+  const ::avro::Metadata& GetMetadata() const override { return 
reader_->metadata(); }
+
+  const ::avro::ValidSchema& GetReaderSchema() const override {
+    return reader_->readerSchema();
+  }
+
+  void Close() override {
+    if (reader_) {
+      reader_->close();
+      reader_.reset();
+    }
+  }
+
+  bool Closed() const override { return reader_ == nullptr; }
+
+ private:
+  std::unique_ptr<::avro::DataFileReaderBase> reader_;
+  // Decode context for reusing scratch buffers
+  DecodeContext decode_context_;
+};
+
+// Backend implementation using avro::GenericDatum.
+class GenericDatumBackend : public AvroReadBackend {
+ public:
+  Result<::avro::ValidSchema> Init(
+      std::unique_ptr<AvroInputStream> input_stream) override {
+    reader_ = std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
+        std::move(input_stream));
+    return reader_->dataSchema();
+  }
+
+  Status InitWithSchema(const ::avro::ValidSchema& /*file_schema*/,
+                        const std::optional<Split>& split) override {
+    if (split) {
+      reader_->sync(split->offset);
+    }
+    return {};
+  }
+
+  void InitReadContext(const ::avro::ValidSchema& reader_schema) override {
+    datum_ = std::make_unique<::avro::GenericDatum>(reader_schema);
+  }
+
+  bool HasMore() override {
+    has_more_ = reader_->read(*datum_);
+    return has_more_;
+  }
+
+  Status DecodeNext(const SchemaProjection& projection, const Schema& 
read_schema,
+                    ::arrow::ArrayBuilder* builder) override {
+    return AppendDatumToBuilder(GetReaderSchema().root(), *datum_, projection,
+                                read_schema, builder);
+  }
+
+  bool IsPastSync(int64_t split_end) const override {
+    return reader_->pastSync(split_end);
+  }
+
+  const ::avro::Metadata& GetMetadata() const override { return 
reader_->metadata(); }
+
+  const ::avro::ValidSchema& GetReaderSchema() const override {
+    return reader_->readerSchema();
+  }
+
+  void Close() override {
+    if (reader_) {
+      reader_->close();
+      reader_.reset();
+    }
+  }
+
+  bool Closed() const override { return reader_ == nullptr; }
+
+ private:
+  std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> reader_;
+  // Reusable GenericDatum for reading records
+  std::unique_ptr<::avro::GenericDatum> datum_;
+  // Cached result from HasMore()
+  bool has_more_ = false;
+};
 
 // A stateful context to keep track of the reading progress.
 struct ReadContext {
@@ -67,14 +198,10 @@ struct ReadContext {
   std::shared_ptr<::arrow::Schema> arrow_schema_;
   // The builder to build the record batch.
   std::shared_ptr<::arrow::ArrayBuilder> builder_;
-  // GenericDatum for GenericDatum-based decoding (only used if direct decoder 
is
-  // disabled)
-  std::unique_ptr<::avro::GenericDatum> datum_;
-  // Decode context for reusing scratch buffers (only used if direct decoder is
-  // enabled)
-  DecodeContext decode_context_;
 };
 
+}  // namespace
+
 // TODO(gang.wu): collect basic reader metrics
 class AvroReader::Impl {
  public:
@@ -85,7 +212,6 @@ class AvroReader::Impl {
     }
 
     batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
-    use_direct_decoder_ = 
options.properties->Get(ReaderProperties::kAvroSkipDatum);
     read_schema_ = options.projection;
 
     // Open the input stream and adapt to the avro interface.
@@ -94,22 +220,15 @@ class AvroReader::Impl {
         CreateInputStream(options,
                           
options.properties->Get(ReaderProperties::kAvroBufferSize)));
 
-    ::avro::ValidSchema file_schema;
-
-    if (use_direct_decoder_) {
-      // Create base reader for direct decoder access
-      auto base_reader =
-          
std::make_unique<::avro::DataFileReaderBase>(std::move(input_stream));
-      file_schema = base_reader->dataSchema();
-      base_reader_ = std::move(base_reader);
+    // Create the appropriate backend based on configuration
+    if (options.properties->Get(ReaderProperties::kAvroSkipDatum)) {
+      backend_ = std::make_unique<DirectDecoderBackend>();
     } else {
-      // Create DataFileReader<GenericDatum> for GenericDatum-based decoding
-      auto datum_reader = 
std::make_unique<::avro::DataFileReader<::avro::GenericDatum>>(
-          std::move(input_stream));
-      file_schema = datum_reader->dataSchema();
-      datum_reader_ = std::move(datum_reader);
+      backend_ = std::make_unique<GenericDatumBackend>();
     }
 
+    ICEBERG_ASSIGN_OR_RAISE(auto file_schema, 
backend_->Init(std::move(input_stream)));
+
     // Validate field ids in the file schema.
     HasIdVisitor has_id_visitor;
     ICEBERG_RETURN_UNEXPECTED(has_id_visitor.Visit(file_schema));
@@ -132,23 +251,13 @@ class AvroReader::Impl {
     }
 
     // Project the read schema on top of the file schema.
-    // TODO(gangwu): support pruning source fields
     ICEBERG_ASSIGN_OR_RAISE(projection_, Project(*read_schema_, 
file_schema.root(),
                                                  /*prune_source=*/false));
 
-    if (use_direct_decoder_) {
-      // Initialize the base reader with the file schema
-      base_reader_->init(file_schema);
-      if (options.split) {
-        base_reader_->sync(options.split->offset);
-        split_end_ = options.split->offset + options.split->length;
-      }
-    } else {
-      // The datum reader is already initialized during construction
-      if (options.split) {
-        datum_reader_->sync(options.split->offset);
-        split_end_ = options.split->offset + options.split->length;
-      }
+    ICEBERG_RETURN_UNEXPECTED(backend_->InitWithSchema(file_schema, 
options.split));
+
+    if (options.split) {
+      split_end_ = options.split->offset + options.split->length;
     }
 
     return {};
@@ -163,34 +272,18 @@ class AvroReader::Impl {
       if (IsPastSync()) {
         break;
       }
-
-      if (use_direct_decoder_) {
-        // Direct decoder: decode Avro to Arrow without GenericDatum
-        if (!base_reader_->hasMore()) {
-          break;
-        }
-        base_reader_->decr();
-
-        ICEBERG_RETURN_UNEXPECTED(DecodeAvroToBuilder(
-            GetReaderSchema().root(), base_reader_->decoder(), projection_, 
*read_schema_,
-            context_->builder_.get(), context_->decode_context_));
-      } else {
-        // GenericDatum-based decoding: decode via GenericDatum intermediate
-        if (!datum_reader_->read(*context_->datum_)) {
-          break;
-        }
-
-        ICEBERG_RETURN_UNEXPECTED(
-            AppendDatumToBuilder(GetReaderSchema().root(), *context_->datum_, 
projection_,
-                                 *read_schema_, context_->builder_.get()));
+      if (!backend_->HasMore()) {
+        break;
       }
+      ICEBERG_RETURN_UNEXPECTED(
+          backend_->DecodeNext(projection_, *read_schema_, 
context_->builder_.get()));
     }
 
     return ConvertBuilderToArrowArray();
   }
 
   Status Close() {
-    CloseReader();
+    backend_->Close();
     context_.reset();
     return {};
   }
@@ -209,12 +302,11 @@ class AvroReader::Impl {
   }
 
   Result<std::unordered_map<std::string, std::string>> Metadata() {
-    if ((use_direct_decoder_ && !base_reader_) ||
-        (!use_direct_decoder_ && !datum_reader_)) {
+    if (backend_->Closed()) {
       return Invalid("Reader is not opened");
     }
 
-    const auto& metadata = GetReaderMetadata();
+    const auto& metadata = backend_->GetMetadata();
     std::unordered_map<std::string, std::string> metadata_map;
     metadata_map.reserve(metadata.size());
 
@@ -247,11 +339,7 @@ class AvroReader::Impl {
                            builder_result.status().message());
     }
     context_->builder_ = builder_result.MoveValueUnsafe();
-
-    // Initialize GenericDatum for GenericDatum-based decoding
-    if (!use_direct_decoder_) {
-      context_->datum_ = 
std::make_unique<::avro::GenericDatum>(GetReaderSchema());
-    }
+    backend_->InitReadContext(backend_->GetReaderSchema());
 
     return {};
   }
@@ -281,48 +369,20 @@ class AvroReader::Impl {
     if (!split_end_) {
       return false;
     }
-    return use_direct_decoder_ ? base_reader_->pastSync(split_end_.value())
-                               : datum_reader_->pastSync(split_end_.value());
-  }
-
-  const ::avro::Metadata& GetReaderMetadata() const {
-    return use_direct_decoder_ ? base_reader_->metadata() : 
datum_reader_->metadata();
-  }
-
-  void CloseReader() {
-    if (use_direct_decoder_) {
-      if (base_reader_) {
-        base_reader_->close();
-        base_reader_.reset();
-      }
-    } else {
-      if (datum_reader_) {
-        datum_reader_->close();
-        datum_reader_.reset();
-      }
-    }
-  }
-
-  const ::avro::ValidSchema& GetReaderSchema() const {
-    return use_direct_decoder_ ? base_reader_->readerSchema()
-                               : datum_reader_->readerSchema();
+    return backend_->IsPastSync(split_end_.value());
   }
 
  private:
   // Max number of rows in the record batch to read.
   int64_t batch_size_{};
-  // Whether to use direct decoder (true) or GenericDatum-based decoder 
(false).
-  bool use_direct_decoder_{true};
   // The end of the split to read and used to terminate the reading.
   std::optional<int64_t> split_end_;
   // The schema to read.
   std::shared_ptr<::iceberg::Schema> read_schema_;
   // The projection result to apply to the read schema.
   SchemaProjection projection_;
-  // The avro reader base - provides direct access to decoder for direct 
decoding.
-  std::unique_ptr<::avro::DataFileReaderBase> base_reader_;
-  // The datum reader for GenericDatum-based decoding.
-  std::unique_ptr<::avro::DataFileReader<::avro::GenericDatum>> datum_reader_;
+  // The read backend to read data into Arrow.
+  std::unique_ptr<AvroReadBackend> backend_;
   // The context to keep track of the reading progress.
   std::unique_ptr<ReadContext> context_;
 };
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 0c640231..b426a756 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -52,6 +52,95 @@ Result<std::unique_ptr<AvroOutputStream>> 
CreateOutputStream(const WriterOptions
   return std::make_unique<AvroOutputStream>(output, buffer_size);
 }
 
+// Abstract base class for Avro write backends.
+class AvroWriteBackend {
+ public:
+  virtual ~AvroWriteBackend() = default;
+  virtual Status Init(std::unique_ptr<AvroOutputStream> output_stream,
+                      const ::avro::ValidSchema& avro_schema, int64_t 
sync_interval,
+                      const std::map<std::string, std::vector<uint8_t>>& 
metadata) = 0;
+  virtual Status WriteRow(const Schema& write_schema, const ::arrow::Array& 
array,
+                          int64_t row_index) = 0;
+  virtual void Close() = 0;
+  virtual bool Closed() const = 0;
+};
+
+// Backend implementation using direct Avro encoder.
+class DirectEncoderBackend : public AvroWriteBackend {
+ public:
+  Status Init(std::unique_ptr<AvroOutputStream> output_stream,
+              const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
+              const std::map<std::string, std::vector<uint8_t>>& metadata) 
override {
+    writer_ = 
std::make_unique<::avro::DataFileWriterBase>(std::move(output_stream),
+                                                           avro_schema, 
sync_interval,
+                                                           ::avro::NULL_CODEC, 
metadata);
+    avro_root_node_ = avro_schema.root();
+    return {};
+  }
+
+  Status WriteRow(const Schema& write_schema, const ::arrow::Array& array,
+                  int64_t row_index) override {
+    ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(avro_root_node_, 
writer_->encoder(),
+                                                write_schema, array, row_index,
+                                                encode_ctx_));
+    writer_->incr();
+    return {};
+  }
+
+  void Close() override {
+    if (writer_) {
+      writer_->close();
+      writer_.reset();
+    }
+  }
+
+  bool Closed() const override { return writer_ == nullptr; }
+
+ private:
+  // Root node of the Avro schema
+  ::avro::NodePtr avro_root_node_;
+  // The avro writer using direct encoder
+  std::unique_ptr<::avro::DataFileWriterBase> writer_;
+  // Encode context for reusing scratch buffers
+  EncodeContext encode_ctx_;
+};
+
+// Backend implementation using avro::GenericDatum as the intermediate 
representation.
+class GenericDatumBackend : public AvroWriteBackend {
+ public:
+  Status Init(std::unique_ptr<AvroOutputStream> output_stream,
+              const ::avro::ValidSchema& avro_schema, int64_t sync_interval,
+              const std::map<std::string, std::vector<uint8_t>>& metadata) 
override {
+    writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
+        std::move(output_stream), avro_schema, sync_interval, 
::avro::NULL_CODEC,
+        metadata);
+    datum_ = std::make_unique<::avro::GenericDatum>(avro_schema);
+    return {};
+  }
+
+  Status WriteRow(const Schema& /*write_schema*/, const ::arrow::Array& array,
+                  int64_t row_index) override {
+    ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(array, row_index, 
datum_.get()));
+    writer_->write(*datum_);
+    return {};
+  }
+
+  void Close() override {
+    if (writer_) {
+      writer_->close();
+      writer_.reset();
+    }
+  }
+
+  bool Closed() const override { return writer_ == nullptr; }
+
+ private:
+  // The avro writer to write the data into a datum
+  std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_;
+  // Reusable Avro datum for writing individual records
+  std::unique_ptr<::avro::GenericDatum> datum_;
+};
+
 }  // namespace
 
 class AvroWriter::Impl {
@@ -64,7 +153,6 @@ class AvroWriter::Impl {
 
   Status Open(const WriterOptions& options) {
     write_schema_ = options.schema;
-    use_direct_encoder_ = 
options.properties->Get(WriterProperties::kAvroSkipDatum);
 
     ::avro::NodePtr root;
     ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, 
&root));
@@ -82,6 +170,7 @@ class AvroWriter::Impl {
         CreateOutputStream(options,
                            
options.properties->Get(WriterProperties::kAvroBufferSize)));
     arrow_output_stream_ = output_stream->arrow_output_stream();
+
     std::map<std::string, std::vector<uint8_t>> metadata;
     for (const auto& [key, value] : options.metadata) {
       std::vector<uint8_t> vec;
@@ -90,22 +179,17 @@ class AvroWriter::Impl {
       metadata.emplace(key, std::move(vec));
     }
 
-    if (use_direct_encoder_) {
-      // Skip avro::GenericDatum by using encoder provided by 
DataFileWriterBase.
-      writer_base_ = std::make_unique<::avro::DataFileWriterBase>(
-          std::move(output_stream), *avro_schema_,
-          options.properties->Get(WriterProperties::kAvroSyncInterval),
-          ::avro::NULL_CODEC /*codec*/, metadata);
-      avro_root_node_ = avro_schema_->root();
+    // Create the appropriate backend based on configuration
+    if (options.properties->Get(WriterProperties::kAvroSkipDatum)) {
+      backend_ = std::make_unique<DirectEncoderBackend>();
     } else {
-      // Everything via avro::GenericDatum.
-      writer_datum_ = 
std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
-          std::move(output_stream), *avro_schema_,
-          options.properties->Get(WriterProperties::kAvroSyncInterval),
-          ::avro::NULL_CODEC /*codec*/, metadata);
-      datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
+      backend_ = std::make_unique<GenericDatumBackend>();
     }
 
+    ICEBERG_RETURN_UNEXPECTED(backend_->Init(
+        std::move(output_stream), *avro_schema_,
+        options.properties->Get(WriterProperties::kAvroSyncInterval), 
metadata));
+
     ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
     return {};
   }
@@ -114,45 +198,23 @@ class AvroWriter::Impl {
     ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
                                    ::arrow::ImportArray(data, &arrow_schema_));
 
-    if (use_direct_encoder_) {
-      for (int64_t i = 0; i < result->length(); i++) {
-        ICEBERG_RETURN_UNEXPECTED(
-            EncodeArrowToAvro(avro_root_node_, writer_base_->encoder(), 
*write_schema_,
-                              *result, i, encode_ctx_));
-        writer_base_->incr();
-      }
-    } else {
-      for (int64_t i = 0; i < result->length(); i++) {
-        ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i, 
datum_.get()));
-        writer_datum_->write(*datum_);
-      }
+    for (int64_t i = 0; i < result->length(); i++) {
+      ICEBERG_RETURN_UNEXPECTED(backend_->WriteRow(*write_schema_, *result, 
i));
     }
 
     return {};
   }
 
   Status Close() {
-    if (use_direct_encoder_) {
-      if (writer_base_ != nullptr) {
-        writer_base_->close();
-        writer_base_.reset();
-        ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, 
arrow_output_stream_->Tell());
-        ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
-      }
-    } else {
-      if (writer_datum_ != nullptr) {
-        writer_datum_->close();
-        writer_datum_.reset();
-        ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, 
arrow_output_stream_->Tell());
-        ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
-      }
+    if (!backend_->Closed()) {
+      backend_->Close();
+      ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, 
arrow_output_stream_->Tell());
+      ICEBERG_ARROW_RETURN_NOT_OK(arrow_output_stream_->Close());
     }
     return {};
   }
 
-  bool Closed() const {
-    return use_direct_encoder_ ? writer_base_ == nullptr : writer_datum_ == 
nullptr;
-  }
+  bool Closed() const { return backend_->Closed(); }
 
   Result<int64_t> length() {
     if (Closed()) {
@@ -174,21 +236,8 @@ class AvroWriter::Impl {
   ArrowSchema arrow_schema_;
   // Total length of the written Avro file.
   int64_t total_bytes_ = 0;
-
-  // Flag to determine which encoder to use
-  bool use_direct_encoder_ = true;
-
-  // [Encoder path] Root node of the Avro schema
-  ::avro::NodePtr avro_root_node_;
-  // [Encoder path] The avro writer using direct encoder
-  std::unique_ptr<::avro::DataFileWriterBase> writer_base_;
-  // [Encoder path] Encode context for reusing scratch buffers
-  EncodeContext encode_ctx_;
-
-  // [GenericDatum path] The avro writer to write the data into a datum
-  std::unique_ptr<::avro::DataFileWriter<::avro::GenericDatum>> writer_datum_;
-  // [GenericDatum path] Reusable Avro datum for writing individual records
-  std::unique_ptr<::avro::GenericDatum> datum_;
+  // The write backend to write data.
+  std::unique_ptr<AvroWriteBackend> backend_;
 };
 
 AvroWriter::~AvroWriter() = default;

Reply via email to