This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new cd7e63e  feat: make avro and parquet reader writer more configurable 
(#315)
cd7e63e is described below

commit cd7e63ee8b8e376c7309261744fb972a1779990e
Author: Gang Wu <[email protected]>
AuthorDate: Thu Nov 13 22:49:10 2025 +0800

    feat: make avro and parquet reader writer more configurable (#315)
    
    - Added WriterProperties and ReaderProperties with predefined keys
    - Supported writing key-value metadata to file
    - Allowed manifest writer to customize avro schema name
    
    Fixes #306
---
 src/iceberg/avro/avro_reader.cc         |  2 +-
 src/iceberg/avro/avro_writer.cc         | 17 ++++++---
 src/iceberg/file_reader.cc              | 11 ++++++
 src/iceberg/file_reader.h               | 26 ++++++++++----
 src/iceberg/file_writer.cc              | 11 ++++++
 src/iceberg/file_writer.h               | 31 ++++++++++++++--
 src/iceberg/manifest_reader.h           | 10 ++++++
 src/iceberg/manifest_reader_internal.cc | 10 ++++++
 src/iceberg/manifest_reader_internal.h  |  4 +++
 src/iceberg/manifest_writer.cc          | 63 ++++++++++++++++++++-------------
 src/iceberg/parquet/parquet_reader.cc   |  3 +-
 src/iceberg/parquet/parquet_writer.cc   |  3 +-
 src/iceberg/test/avro_test.cc           | 28 ++++++++++++---
 src/iceberg/test/parquet_test.cc        | 56 ++++++++++++++++++++---------
 src/iceberg/type_fwd.h                  |  2 ++
 15 files changed, 213 insertions(+), 64 deletions(-)

diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index 80087c8..932dd0f 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -82,7 +82,7 @@ class AvroReader::Impl {
       return InvalidArgument("Projected schema is required by Avro reader");
     }
 
-    batch_size_ = options.batch_size;
+    batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
     read_schema_ = options.projection;
 
     // Open the input stream and adapt to the avro interface.
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 4e86a68..9c8d58d 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -66,23 +66,30 @@ class AvroWriter::Impl {
 
     ::avro::NodePtr root;
     ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_, 
&root));
+    if (const auto& schema_name =
+            options.properties->Get(WriterProperties::kAvroSchemaName);
+        !schema_name.empty()) {
+      root->setName(::avro::Name(schema_name));
+    }
 
     avro_schema_ = std::make_shared<::avro::ValidSchema>(root);
 
     // Open the output stream and adapt to the avro interface.
-    constexpr int64_t kDefaultBufferSize = 1024 * 1024;
-    ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
-                            CreateOutputStream(options, kDefaultBufferSize));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto output_stream,
+        CreateOutputStream(options,
+                           
options.properties->Get(WriterProperties::kAvroBufferSize)));
     arrow_output_stream_ = output_stream->arrow_output_stream();
     std::map<std::string, std::vector<uint8_t>> metadata;
-    for (const auto& [key, value] : options.properties) {
+    for (const auto& [key, value] : options.metadata) {
       std::vector<uint8_t> vec;
       vec.reserve(value.size());
       vec.assign(value.begin(), value.end());
       metadata.emplace(key, std::move(vec));
     }
     writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
-        std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
+        std::move(output_stream), *avro_schema_,
+        options.properties->Get(WriterProperties::kAvroSyncInterval),
         ::avro::NULL_CODEC /*codec*/, metadata);
     datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
     ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
diff --git a/src/iceberg/file_reader.cc b/src/iceberg/file_reader.cc
index 59f2189..d0e291d 100644
--- a/src/iceberg/file_reader.cc
+++ b/src/iceberg/file_reader.cc
@@ -59,4 +59,15 @@ Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
   return reader;
 }
 
+std::unique_ptr<ReaderProperties> ReaderProperties::default_properties() {
+  return std::make_unique<ReaderProperties>();
+}
+
+std::unique_ptr<ReaderProperties> ReaderProperties::FromMap(
+    const std::unordered_map<std::string, std::string>& properties) {
+  auto reader_properties = std::make_unique<ReaderProperties>();
+  reader_properties->configs_ = properties;
+  return reader_properties;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h
index d25a5e4..d7e3b09 100644
--- a/src/iceberg/file_reader.h
+++ b/src/iceberg/file_reader.h
@@ -30,6 +30,7 @@
 #include "iceberg/file_format.h"
 #include "iceberg/result.h"
 #include "iceberg/type_fwd.h"
+#include "iceberg/util/config.h"
 
 namespace iceberg {
 
@@ -42,7 +43,7 @@ class ICEBERG_EXPORT Reader {
   Reader& operator=(const Reader&) = delete;
 
   /// \brief Open the reader.
-  virtual Status Open(const struct ReaderOptions& options) = 0;
+  virtual Status Open(const ReaderOptions& options) = 0;
 
   /// \brief Close the reader.
   virtual Status Close() = 0;
@@ -67,19 +68,30 @@ struct ICEBERG_EXPORT Split {
   size_t length;
 };
 
+class ReaderProperties : public ConfigBase<ReaderProperties> {
+ public:
+  template <typename T>
+  using Entry = const ConfigBase<ReaderProperties>::Entry<T>;
+
+  /// \brief The batch size to read.
+  inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
+
+  /// \brief Create a default ReaderProperties instance.
+  static std::unique_ptr<ReaderProperties> default_properties();
+
+  /// \brief Create a ReaderProperties instance from a map of key-value pairs.
+  static std::unique_ptr<ReaderProperties> FromMap(
+      const std::unordered_map<std::string, std::string>& properties);
+};
+
 /// \brief Options for creating a reader.
 struct ICEBERG_EXPORT ReaderOptions {
-  static constexpr int64_t kDefaultBatchSize = 4096;
-
   /// \brief The path to the file to read.
   std::string path;
   /// \brief The total length of the file.
   std::optional<size_t> length;
   /// \brief The split to read.
   std::optional<Split> split;
-  /// \brief The batch size to read. Only applies to implementations that 
support
-  /// batching.
-  int64_t batch_size = kDefaultBatchSize;
   /// \brief FileIO instance to open the file. Reader implementations should 
down cast it
   /// to the specific FileIO implementation. By default, the `iceberg-bundle` 
library uses
   /// `ArrowFileSystemFileIO` as the default implementation.
@@ -93,7 +105,7 @@ struct ICEBERG_EXPORT ReaderOptions {
   /// that may have different field names than the current schema.
   std::shared_ptr<class NameMapping> name_mapping;
   /// \brief Format-specific or implementation-specific properties.
-  std::unordered_map<std::string, std::string> properties;
+  std::shared_ptr<ReaderProperties> properties = 
ReaderProperties::default_properties();
 };
 
 /// \brief Factory function to create a reader of a specific file format.
diff --git a/src/iceberg/file_writer.cc b/src/iceberg/file_writer.cc
index e5dbea3..477562d 100644
--- a/src/iceberg/file_writer.cc
+++ b/src/iceberg/file_writer.cc
@@ -59,4 +59,15 @@ Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
   return writer;
 }
 
+std::unique_ptr<WriterProperties> WriterProperties::default_properties() {
+  return std::make_unique<WriterProperties>();
+}
+
+std::unique_ptr<WriterProperties> WriterProperties::FromMap(
+    const std::unordered_map<std::string, std::string>& properties) {
+  auto writer_properties = std::make_unique<WriterProperties>();
+  writer_properties->configs_ = properties;
+  return writer_properties;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index fba97d3..ea4e423 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -31,9 +31,34 @@
 #include "iceberg/metrics.h"
 #include "iceberg/result.h"
 #include "iceberg/type_fwd.h"
+#include "iceberg/util/config.h"
 
 namespace iceberg {
 
+class WriterProperties : public ConfigBase<WriterProperties> {
+ public:
+  template <typename T>
+  using Entry = const ConfigBase<WriterProperties>::Entry<T>;
+
+  /// \brief The name of the Avro root node schema to write.
+  inline static Entry<std::string> kAvroSchemaName{"write.avro.schema-name", 
""};
+
+  /// \brief The buffer size used by Avro output stream.
+  inline static Entry<int64_t> kAvroBufferSize{"write.avro.buffer-size", 1024 
* 1024};
+
+  /// \brief The sync interval used by Avro writer.
+  inline static Entry<int64_t> kAvroSyncInterval{"write.avro.sync-interval", 
16 * 1024};
+
+  /// TODO(gangwu): add more properties, like compression codec, compression 
level, etc.
+
+  /// \brief Create a default WriterProperties instance.
+  static std::unique_ptr<WriterProperties> default_properties();
+
+  /// \brief Create a WriterProperties instance from a map of key-value pairs.
+  static std::unique_ptr<WriterProperties> FromMap(
+      const std::unordered_map<std::string, std::string>& properties);
+};
+
 /// \brief Options for creating a writer.
 struct ICEBERG_EXPORT WriterOptions {
   /// \brief The path to the file to write.
@@ -44,8 +69,10 @@ struct ICEBERG_EXPORT WriterOptions {
   /// to the specific FileIO implementation. By default, the `iceberg-bundle` 
library uses
   /// `ArrowFileSystemFileIO` as the default implementation.
   std::shared_ptr<class FileIO> io;
+  /// \brief Metadata to write to the file.
+  std::unordered_map<std::string, std::string> metadata;
   /// \brief Format-specific or implementation-specific properties.
-  std::unordered_map<std::string, std::string> properties;
+  std::shared_ptr<WriterProperties> properties = 
WriterProperties::default_properties();
 };
 
 /// \brief Base writer class to write data from different file formats.
@@ -57,7 +84,7 @@ class ICEBERG_EXPORT Writer {
   Writer& operator=(const Writer&) = delete;
 
   /// \brief Open the writer.
-  virtual Status Open(const struct WriterOptions& options) = 0;
+  virtual Status Open(const WriterOptions& options) = 0;
 
   /// \brief Close the writer.
   virtual Status Close() = 0;
diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h
index b1a462a..e162828 100644
--- a/src/iceberg/manifest_reader.h
+++ b/src/iceberg/manifest_reader.h
@@ -36,8 +36,13 @@ namespace iceberg {
 class ICEBERG_EXPORT ManifestReader {
  public:
   virtual ~ManifestReader() = default;
+
+  /// \brief Read all manifest entries in the manifest file.
   virtual Result<std::vector<ManifestEntry>> Entries() const = 0;
 
+  /// \brief Get the metadata of the manifest file.
+  virtual Result<std::unordered_map<std::string, std::string>> Metadata() 
const = 0;
+
   /// \brief Creates a reader for a manifest file.
   /// \param manifest A ManifestFile object containing metadata about the 
manifest.
   /// \param file_io File IO implementation to use.
@@ -61,8 +66,13 @@ class ICEBERG_EXPORT ManifestReader {
 class ICEBERG_EXPORT ManifestListReader {
  public:
   virtual ~ManifestListReader() = default;
+
+  /// \brief Read all manifest files in the manifest list file.
   virtual Result<std::vector<ManifestFile>> Files() const = 0;
 
+  /// \brief Get the metadata of the manifest list file.
+  virtual Result<std::unordered_map<std::string, std::string>> Metadata() 
const = 0;
+
   /// \brief Creates a reader for the manifest list.
   /// \param manifest_list_location Path to the manifest list file.
   /// \param file_io File IO implementation to use.
diff --git a/src/iceberg/manifest_reader_internal.cc 
b/src/iceberg/manifest_reader_internal.cc
index 002d4a4..346f19a 100644
--- a/src/iceberg/manifest_reader_internal.cc
+++ b/src/iceberg/manifest_reader_internal.cc
@@ -548,6 +548,11 @@ Result<std::vector<ManifestEntry>> 
ManifestReaderImpl::Entries() const {
   return manifest_entries;
 }
 
+Result<std::unordered_map<std::string, std::string>> 
ManifestReaderImpl::Metadata()
+    const {
+  return reader_->Metadata();
+}
+
 Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
   std::vector<ManifestFile> manifest_files;
   ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
@@ -569,6 +574,11 @@ Result<std::vector<ManifestFile>> 
ManifestListReaderImpl::Files() const {
   return manifest_files;
 }
 
+Result<std::unordered_map<std::string, std::string>> 
ManifestListReaderImpl::Metadata()
+    const {
+  return reader_->Metadata();
+}
+
 Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
   if (index >= 0 && index < 
static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
     return static_cast<ManifestFileField>(index);
diff --git a/src/iceberg/manifest_reader_internal.h 
b/src/iceberg/manifest_reader_internal.h
index 13e3d2a..e12892e 100644
--- a/src/iceberg/manifest_reader_internal.h
+++ b/src/iceberg/manifest_reader_internal.h
@@ -40,6 +40,8 @@ class ManifestReaderImpl : public ManifestReader {
 
   Result<std::vector<ManifestEntry>> Entries() const override;
 
+  Result<std::unordered_map<std::string, std::string>> Metadata() const 
override;
+
  private:
   std::shared_ptr<Schema> schema_;
   std::unique_ptr<Reader> reader_;
@@ -55,6 +57,8 @@ class ManifestListReaderImpl : public ManifestListReader {
 
   Result<std::vector<ManifestFile>> Files() const override;
 
+  Result<std::unordered_map<std::string, std::string>> Metadata() const 
override;
+
  private:
   std::shared_ptr<Schema> schema_;
   std::unique_ptr<Reader> reader_;
diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc
index 9c2be56..fd730f2 100644
--- a/src/iceberg/manifest_writer.cc
+++ b/src/iceberg/manifest_writer.cc
@@ -58,13 +58,20 @@ ManifestContent ManifestWriter::content() const { return 
adapter_->content(); }
 Result<std::unique_ptr<Writer>> OpenFileWriter(
     std::string_view location, std::shared_ptr<Schema> schema,
     std::shared_ptr<FileIO> file_io,
-    std::unordered_map<std::string, std::string> properties) {
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
-                                               {.path = std::string(location),
-                                                .schema = std::move(schema),
-                                                .io = std::move(file_io),
-                                                .properties = 
std::move(properties)}));
+    std::unordered_map<std::string, std::string> metadata, std::string_view 
schema_name) {
+  auto writer_properties = WriterProperties::default_properties();
+  if (!schema_name.empty()) {
+    writer_properties->Set(WriterProperties::kAvroSchemaName, 
std::string(schema_name));
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
+                                           FileFormatType::kAvro,
+                                           {
+                                               .path = std::string(location),
+                                               .schema = std::move(schema),
+                                               .io = std::move(file_io),
+                                               .metadata = std::move(metadata),
+                                               .properties = 
std::move(writer_properties),
+                                           }));
   return writer;
 }
 
@@ -91,9 +98,10 @@ Result<std::unique_ptr<ManifestWriter>> 
ManifestWriter::MakeV1Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(auto writer,
-                          OpenFileWriter(manifest_location, std::move(schema),
-                                         std::move(file_io), 
adapter->metadata()));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer,
+      OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
+                     adapter->metadata(), "manifest_entry"));
   return std::make_unique<ManifestWriter>(std::move(writer), 
std::move(adapter));
 }
 
@@ -119,9 +127,10 @@ Result<std::unique_ptr<ManifestWriter>> 
ManifestWriter::MakeV2Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(auto writer,
-                          OpenFileWriter(manifest_location, std::move(schema),
-                                         std::move(file_io), 
adapter->metadata()));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer,
+      OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
+                     adapter->metadata(), "manifest_entry"));
   return std::make_unique<ManifestWriter>(std::move(writer), 
std::move(adapter));
 }
 
@@ -149,9 +158,10 @@ Result<std::unique_ptr<ManifestWriter>> 
ManifestWriter::MakeV3Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(auto writer,
-                          OpenFileWriter(manifest_location, std::move(schema),
-                                         std::move(file_io), 
adapter->metadata()));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer,
+      OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
+                     adapter->metadata(), "manifest_entry"));
   return std::make_unique<ManifestWriter>(std::move(writer), 
std::move(adapter));
 }
 
@@ -191,9 +201,10 @@ Result<std::unique_ptr<ManifestListWriter>> 
ManifestListWriter::MakeV1Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(auto writer,
-                          OpenFileWriter(manifest_list_location, 
std::move(schema),
-                                         std::move(file_io), 
adapter->metadata()));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer,
+      OpenFileWriter(manifest_list_location, std::move(schema), 
std::move(file_io),
+                     adapter->metadata(), "manifest_file"));
   return std::make_unique<ManifestListWriter>(std::move(writer), 
std::move(adapter));
 }
 
@@ -207,9 +218,10 @@ Result<std::unique_ptr<ManifestListWriter>> 
ManifestListWriter::MakeV2Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(auto writer,
-                          OpenFileWriter(manifest_list_location, 
std::move(schema),
-                                         std::move(file_io), 
adapter->metadata()));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer,
+      OpenFileWriter(manifest_list_location, std::move(schema), 
std::move(file_io),
+                     adapter->metadata(), "manifest_file"));
 
   return std::make_unique<ManifestListWriter>(std::move(writer), 
std::move(adapter));
 }
@@ -224,9 +236,10 @@ Result<std::unique_ptr<ManifestListWriter>> 
ManifestListWriter::MakeV3Writer(
   ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
 
   auto schema = adapter->schema();
-  ICEBERG_ASSIGN_OR_RAISE(auto writer,
-                          OpenFileWriter(manifest_list_location, 
std::move(schema),
-                                         std::move(file_io), 
adapter->metadata()));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer,
+      OpenFileWriter(manifest_list_location, std::move(schema), 
std::move(file_io),
+                     adapter->metadata(), "manifest_file"));
   return std::make_unique<ManifestListWriter>(std::move(writer), 
std::move(adapter));
 }
 
diff --git a/src/iceberg/parquet/parquet_reader.cc 
b/src/iceberg/parquet/parquet_reader.cc
index 64373d0..9647658 100644
--- a/src/iceberg/parquet/parquet_reader.cc
+++ b/src/iceberg/parquet/parquet_reader.cc
@@ -122,7 +122,8 @@ class ParquetReader::Impl {
     // Prepare reader properties
     ::parquet::ReaderProperties reader_properties(pool_);
     ::parquet::ArrowReaderProperties arrow_reader_properties;
-    arrow_reader_properties.set_batch_size(options.batch_size);
+    arrow_reader_properties.set_batch_size(
+        options.properties->Get(ReaderProperties::kBatchSize));
     arrow_reader_properties.set_arrow_extensions_enabled(true);
 
     // Open the Parquet file reader
diff --git a/src/iceberg/parquet/parquet_writer.cc 
b/src/iceberg/parquet/parquet_writer.cc
index 61f4671..9c8f081 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -68,7 +68,8 @@ class ParquetWriter::Impl {
 
     ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
     auto file_writer = ::parquet::ParquetFileWriter::Open(
-        output_stream_, std::move(schema_node), std::move(writer_properties));
+        output_stream_, std::move(schema_node), std::move(writer_properties),
+        std::make_shared<::arrow::KeyValueMetadata>(options.metadata));
     ICEBERG_ARROW_RETURN_NOT_OK(
         ::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), 
arrow_schema_,
                                            std::move(arrow_writer_properties), 
&writer_));
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 4ca8ca1..1d421ed 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -123,9 +123,13 @@ class AvroReaderTest : public TempFileTestBase {
     auto export_result = ::arrow::ExportArray(*array, &arrow_array);
     ASSERT_TRUE(export_result.ok());
 
-    auto writer_result = WriterFactoryRegistry::Open(
-        FileFormatType::kAvro,
-        {.path = temp_avro_file_, .schema = schema, .io = file_io_});
+    std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"}, 
{"k2", "v2"}};
+
+    auto writer_result =
+        WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path = 
temp_avro_file_,
+                                                            .schema = schema,
+                                                            .io = file_io_,
+                                                            .metadata = 
metadata});
     ASSERT_TRUE(writer_result.has_value());
     auto writer = std::move(writer_result.value());
     ASSERT_THAT(writer->Write(&arrow_array), IsOk());
@@ -144,6 +148,15 @@ class AvroReaderTest : public TempFileTestBase {
     auto reader = std::move(reader_result.value());
     ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
     ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
+
+    auto metadata_result = reader->Metadata();
+    ASSERT_THAT(metadata_result, IsOk());
+    auto read_metadata = std::move(metadata_result.value());
+    for (const auto& [key, value] : metadata) {
+      auto it = read_metadata.find(key);
+      ASSERT_NE(it, read_metadata.end());
+      ASSERT_EQ(it->second, value);
+    }
   }
 
   std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
@@ -191,9 +204,14 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
   auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
       SchemaField::MakeRequired(1, "id", std::make_shared<IntType>())});
 
+  auto reader_properties = ReaderProperties::default_properties();
+  reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2});
+
   auto reader_result = ReaderFactoryRegistry::Open(
-      FileFormatType::kAvro,
-      {.path = temp_avro_file_, .batch_size = 2, .io = file_io_, .projection = 
schema});
+      FileFormatType::kAvro, {.path = temp_avro_file_,
+                              .io = file_io_,
+                              .projection = schema,
+                              .properties = std::move(reader_properties)});
   ASSERT_THAT(reader_result, IsOk());
   auto reader = std::move(reader_result.value());
 
diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc
index cbf49fb..a9be5fb 100644
--- a/src/iceberg/test/parquet_test.cc
+++ b/src/iceberg/test/parquet_test.cc
@@ -63,7 +63,8 @@ Status WriteArray(std::shared_ptr<::arrow::Array> data,
 }
 
 Status ReadArray(std::shared_ptr<::arrow::Array>& out,
-                 const ReaderOptions& reader_options) {
+                 const ReaderOptions& reader_options,
+                 std::unordered_map<std::string, std::string>* metadata) {
   ICEBERG_ASSIGN_OR_RAISE(
       auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet, 
reader_options));
   ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next());
@@ -77,6 +78,11 @@ Status ReadArray(std::shared_ptr<::arrow::Array>& out,
   ICEBERG_ASSIGN_OR_RAISE(ArrowSchema arrow_schema, reader->Schema());
   ICEBERG_ARROW_ASSIGN_OR_RETURN(out,
                                  ::arrow::ImportArray(&arrow_c_array, 
&arrow_schema));
+
+  if (metadata) {
+    ICEBERG_ASSIGN_OR_RAISE(*metadata, reader->Metadata());
+  }
+
   return {};
 }
 
@@ -85,18 +91,25 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data, 
std::shared_ptr<Schema> s
   std::shared_ptr<FileIO> file_io = 
arrow::ArrowFileSystemFileIO::MakeMockFileIO();
   const std::string basePath = "base.parquet";
 
+  std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"}, 
{"k2", "v2"}};
+
   auto writer_data = WriterFactoryRegistry::Open(
-      FileFormatType::kParquet, {.path = basePath, .schema = schema, .io = 
file_io});
+      FileFormatType::kParquet,
+      {.path = basePath, .schema = schema, .io = file_io, .metadata = 
metadata});
   ASSERT_THAT(writer_data, IsOk())
       << "Failed to create writer: " << writer_data.error().message;
   auto writer = std::move(writer_data.value());
   ASSERT_THAT(WriteArray(data, *writer), IsOk());
 
-  ASSERT_THAT(ReadArray(out, {.path = basePath,
-                              .length = writer->length(),
-                              .io = file_io,
-                              .projection = schema}),
+  std::unordered_map<std::string, std::string> read_metadata;
+  ASSERT_THAT(ReadArray(out,
+                        {.path = basePath,
+                         .length = writer->length(),
+                         .io = file_io,
+                         .projection = schema},
+                        &read_metadata),
               IsOk());
+  ASSERT_EQ(read_metadata, metadata);
 
   ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
 }
@@ -231,11 +244,14 @@ TEST_F(ParquetReaderTest, ReadWithBatchSize) {
   auto schema = std::make_shared<Schema>(
       std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
 
-  auto reader_result =
-      ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path = 
temp_parquet_file_,
-                                                             .batch_size = 2,
-                                                             .io = file_io_,
-                                                             .projection = 
schema});
+  auto reader_properties = ReaderProperties::default_properties();
+  reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2});
+
+  auto reader_result = ReaderFactoryRegistry::Open(
+      FileFormatType::kParquet, {.path = temp_parquet_file_,
+                                 .io = file_io_,
+                                 .projection = schema,
+                                 .properties = std::move(reader_properties)});
   ASSERT_THAT(reader_result, IsOk());
   auto reader = std::move(reader_result.value());
 
@@ -271,13 +287,19 @@ TEST_F(ParquetReaderTest, ReadSplit) {
       R"([[1], [2], [3]])", R"([[1], [2]])", R"([[3]])", "", "",
   };
 
+  std::shared_ptr<ReaderProperties> reader_properties =
+      ReaderProperties::default_properties();
+  reader_properties->Set(ReaderProperties::kBatchSize, int64_t{100});
+
   for (size_t i = 0; i < splits.size(); ++i) {
-    auto reader_result =
-        ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path = 
temp_parquet_file_,
-                                                               .split = 
splits[i],
-                                                               .batch_size = 
100,
-                                                               .io = file_io_,
-                                                               .projection = 
schema});
+    auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kParquet,
+                                                     {
+                                                         .path = 
temp_parquet_file_,
+                                                         .split = splits[i],
+                                                         .io = file_io_,
+                                                         .projection = schema,
+                                                         .properties = 
reader_properties,
+                                                     });
     ASSERT_THAT(reader_result, IsOk());
     auto reader = std::move(reader_result.value());
     if (!expected_json[i].empty()) {
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 62608a9..5485d83 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -139,6 +139,8 @@ class ManifestListWriter;
 class ManifestReader;
 class ManifestWriter;
 
+struct ReaderOptions;
+struct WriterOptions;
 class Reader;
 class Writer;
 

Reply via email to