This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new cd7e63e feat: make avro and parquet reader writer more configurable
(#315)
cd7e63e is described below
commit cd7e63ee8b8e376c7309261744fb972a1779990e
Author: Gang Wu <[email protected]>
AuthorDate: Thu Nov 13 22:49:10 2025 +0800
feat: make avro and parquet reader writer more configurable (#315)
- Added WriterProperties and ReaderProperties with predefined keys
- Supported writing key-value metadata to file
- Allowed manifest writer to customize avro schema name
Fixes #306
---
src/iceberg/avro/avro_reader.cc | 2 +-
src/iceberg/avro/avro_writer.cc | 17 ++++++---
src/iceberg/file_reader.cc | 11 ++++++
src/iceberg/file_reader.h | 26 ++++++++++----
src/iceberg/file_writer.cc | 11 ++++++
src/iceberg/file_writer.h | 31 ++++++++++++++--
src/iceberg/manifest_reader.h | 10 ++++++
src/iceberg/manifest_reader_internal.cc | 10 ++++++
src/iceberg/manifest_reader_internal.h | 4 +++
src/iceberg/manifest_writer.cc | 63 ++++++++++++++++++++-------------
src/iceberg/parquet/parquet_reader.cc | 3 +-
src/iceberg/parquet/parquet_writer.cc | 3 +-
src/iceberg/test/avro_test.cc | 28 ++++++++++++---
src/iceberg/test/parquet_test.cc | 56 ++++++++++++++++++++---------
src/iceberg/type_fwd.h | 2 ++
15 files changed, 213 insertions(+), 64 deletions(-)
diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index 80087c8..932dd0f 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -82,7 +82,7 @@ class AvroReader::Impl {
return InvalidArgument("Projected schema is required by Avro reader");
}
- batch_size_ = options.batch_size;
+ batch_size_ = options.properties->Get(ReaderProperties::kBatchSize);
read_schema_ = options.projection;
// Open the input stream and adapt to the avro interface.
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 4e86a68..9c8d58d 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -66,23 +66,30 @@ class AvroWriter::Impl {
::avro::NodePtr root;
ICEBERG_RETURN_UNEXPECTED(ToAvroNodeVisitor{}.Visit(*write_schema_,
&root));
+ if (const auto& schema_name =
+ options.properties->Get(WriterProperties::kAvroSchemaName);
+ !schema_name.empty()) {
+ root->setName(::avro::Name(schema_name));
+ }
avro_schema_ = std::make_shared<::avro::ValidSchema>(root);
// Open the output stream and adapt to the avro interface.
- constexpr int64_t kDefaultBufferSize = 1024 * 1024;
- ICEBERG_ASSIGN_OR_RAISE(auto output_stream,
- CreateOutputStream(options, kDefaultBufferSize));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto output_stream,
+ CreateOutputStream(options,
+
options.properties->Get(WriterProperties::kAvroBufferSize)));
arrow_output_stream_ = output_stream->arrow_output_stream();
std::map<std::string, std::vector<uint8_t>> metadata;
- for (const auto& [key, value] : options.properties) {
+ for (const auto& [key, value] : options.metadata) {
std::vector<uint8_t> vec;
vec.reserve(value.size());
vec.assign(value.begin(), value.end());
metadata.emplace(key, std::move(vec));
}
writer_ = std::make_unique<::avro::DataFileWriter<::avro::GenericDatum>>(
- std::move(output_stream), *avro_schema_, 16 * 1024 /*syncInterval*/,
+ std::move(output_stream), *avro_schema_,
+ options.properties->Get(WriterProperties::kAvroSyncInterval),
::avro::NULL_CODEC /*codec*/, metadata);
datum_ = std::make_unique<::avro::GenericDatum>(*avro_schema_);
ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*write_schema_, &arrow_schema_));
diff --git a/src/iceberg/file_reader.cc b/src/iceberg/file_reader.cc
index 59f2189..d0e291d 100644
--- a/src/iceberg/file_reader.cc
+++ b/src/iceberg/file_reader.cc
@@ -59,4 +59,15 @@ Result<std::unique_ptr<Reader>> ReaderFactoryRegistry::Open(
return reader;
}
+std::unique_ptr<ReaderProperties> ReaderProperties::default_properties() {
+ return std::make_unique<ReaderProperties>();
+}
+
+std::unique_ptr<ReaderProperties> ReaderProperties::FromMap(
+ const std::unordered_map<std::string, std::string>& properties) {
+ auto reader_properties = std::make_unique<ReaderProperties>();
+ reader_properties->configs_ = properties;
+ return reader_properties;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/file_reader.h b/src/iceberg/file_reader.h
index d25a5e4..d7e3b09 100644
--- a/src/iceberg/file_reader.h
+++ b/src/iceberg/file_reader.h
@@ -30,6 +30,7 @@
#include "iceberg/file_format.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
+#include "iceberg/util/config.h"
namespace iceberg {
@@ -42,7 +43,7 @@ class ICEBERG_EXPORT Reader {
Reader& operator=(const Reader&) = delete;
/// \brief Open the reader.
- virtual Status Open(const struct ReaderOptions& options) = 0;
+ virtual Status Open(const ReaderOptions& options) = 0;
/// \brief Close the reader.
virtual Status Close() = 0;
@@ -67,19 +68,30 @@ struct ICEBERG_EXPORT Split {
size_t length;
};
+class ReaderProperties : public ConfigBase<ReaderProperties> {
+ public:
+ template <typename T>
+ using Entry = const ConfigBase<ReaderProperties>::Entry<T>;
+
+ /// \brief The batch size to read.
+ inline static Entry<int64_t> kBatchSize{"read.batch-size", 4096};
+
+ /// \brief Create a default ReaderProperties instance.
+ static std::unique_ptr<ReaderProperties> default_properties();
+
+ /// \brief Create a ReaderProperties instance from a map of key-value pairs.
+ static std::unique_ptr<ReaderProperties> FromMap(
+ const std::unordered_map<std::string, std::string>& properties);
+};
+
/// \brief Options for creating a reader.
struct ICEBERG_EXPORT ReaderOptions {
- static constexpr int64_t kDefaultBatchSize = 4096;
-
/// \brief The path to the file to read.
std::string path;
/// \brief The total length of the file.
std::optional<size_t> length;
/// \brief The split to read.
std::optional<Split> split;
- /// \brief The batch size to read. Only applies to implementations that
support
- /// batching.
- int64_t batch_size = kDefaultBatchSize;
/// \brief FileIO instance to open the file. Reader implementations should
down cast it
/// to the specific FileIO implementation. By default, the `iceberg-bundle`
library uses
/// `ArrowFileSystemFileIO` as the default implementation.
@@ -93,7 +105,7 @@ struct ICEBERG_EXPORT ReaderOptions {
/// that may have different field names than the current schema.
std::shared_ptr<class NameMapping> name_mapping;
/// \brief Format-specific or implementation-specific properties.
- std::unordered_map<std::string, std::string> properties;
+ std::shared_ptr<ReaderProperties> properties =
ReaderProperties::default_properties();
};
/// \brief Factory function to create a reader of a specific file format.
diff --git a/src/iceberg/file_writer.cc b/src/iceberg/file_writer.cc
index e5dbea3..477562d 100644
--- a/src/iceberg/file_writer.cc
+++ b/src/iceberg/file_writer.cc
@@ -59,4 +59,15 @@ Result<std::unique_ptr<Writer>> WriterFactoryRegistry::Open(
return writer;
}
+std::unique_ptr<WriterProperties> WriterProperties::default_properties() {
+ return std::make_unique<WriterProperties>();
+}
+
+std::unique_ptr<WriterProperties> WriterProperties::FromMap(
+ const std::unordered_map<std::string, std::string>& properties) {
+ auto writer_properties = std::make_unique<WriterProperties>();
+ writer_properties->configs_ = properties;
+ return writer_properties;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index fba97d3..ea4e423 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -31,9 +31,34 @@
#include "iceberg/metrics.h"
#include "iceberg/result.h"
#include "iceberg/type_fwd.h"
+#include "iceberg/util/config.h"
namespace iceberg {
+class WriterProperties : public ConfigBase<WriterProperties> {
+ public:
+ template <typename T>
+ using Entry = const ConfigBase<WriterProperties>::Entry<T>;
+
+ /// \brief The name of the Avro root node schema to write.
+ inline static Entry<std::string> kAvroSchemaName{"write.avro.schema-name",
""};
+
+ /// \brief The buffer size used by Avro output stream.
+ inline static Entry<int64_t> kAvroBufferSize{"write.avro.buffer-size", 1024
* 1024};
+
+ /// \brief The sync interval used by Avro writer.
+ inline static Entry<int64_t> kAvroSyncInterval{"write.avro.sync-interval",
16 * 1024};
+
+ /// TODO(gangwu): add more properties, like compression codec, compression
level, etc.
+
+ /// \brief Create a default WriterProperties instance.
+ static std::unique_ptr<WriterProperties> default_properties();
+
+ /// \brief Create a WriterProperties instance from a map of key-value pairs.
+ static std::unique_ptr<WriterProperties> FromMap(
+ const std::unordered_map<std::string, std::string>& properties);
+};
+
/// \brief Options for creating a writer.
struct ICEBERG_EXPORT WriterOptions {
/// \brief The path to the file to write.
@@ -44,8 +69,10 @@ struct ICEBERG_EXPORT WriterOptions {
/// to the specific FileIO implementation. By default, the `iceberg-bundle`
library uses
/// `ArrowFileSystemFileIO` as the default implementation.
std::shared_ptr<class FileIO> io;
+ /// \brief Metadata to write to the file.
+ std::unordered_map<std::string, std::string> metadata;
/// \brief Format-specific or implementation-specific properties.
- std::unordered_map<std::string, std::string> properties;
+ std::shared_ptr<WriterProperties> properties =
WriterProperties::default_properties();
};
/// \brief Base writer class to write data from different file formats.
@@ -57,7 +84,7 @@ class ICEBERG_EXPORT Writer {
Writer& operator=(const Writer&) = delete;
/// \brief Open the writer.
- virtual Status Open(const struct WriterOptions& options) = 0;
+ virtual Status Open(const WriterOptions& options) = 0;
/// \brief Close the writer.
virtual Status Close() = 0;
diff --git a/src/iceberg/manifest_reader.h b/src/iceberg/manifest_reader.h
index b1a462a..e162828 100644
--- a/src/iceberg/manifest_reader.h
+++ b/src/iceberg/manifest_reader.h
@@ -36,8 +36,13 @@ namespace iceberg {
class ICEBERG_EXPORT ManifestReader {
public:
virtual ~ManifestReader() = default;
+
+ /// \brief Read all manifest entries in the manifest file.
virtual Result<std::vector<ManifestEntry>> Entries() const = 0;
+ /// \brief Get the metadata of the manifest file.
+ virtual Result<std::unordered_map<std::string, std::string>> Metadata()
const = 0;
+
/// \brief Creates a reader for a manifest file.
/// \param manifest A ManifestFile object containing metadata about the
manifest.
/// \param file_io File IO implementation to use.
@@ -61,8 +66,13 @@ class ICEBERG_EXPORT ManifestReader {
class ICEBERG_EXPORT ManifestListReader {
public:
virtual ~ManifestListReader() = default;
+
+ /// \brief Read all manifest files in the manifest list file.
virtual Result<std::vector<ManifestFile>> Files() const = 0;
+ /// \brief Get the metadata of the manifest list file.
+ virtual Result<std::unordered_map<std::string, std::string>> Metadata()
const = 0;
+
/// \brief Creates a reader for the manifest list.
/// \param manifest_list_location Path to the manifest list file.
/// \param file_io File IO implementation to use.
diff --git a/src/iceberg/manifest_reader_internal.cc
b/src/iceberg/manifest_reader_internal.cc
index 002d4a4..346f19a 100644
--- a/src/iceberg/manifest_reader_internal.cc
+++ b/src/iceberg/manifest_reader_internal.cc
@@ -548,6 +548,11 @@ Result<std::vector<ManifestEntry>>
ManifestReaderImpl::Entries() const {
return manifest_entries;
}
+Result<std::unordered_map<std::string, std::string>>
ManifestReaderImpl::Metadata()
+ const {
+ return reader_->Metadata();
+}
+
Result<std::vector<ManifestFile>> ManifestListReaderImpl::Files() const {
std::vector<ManifestFile> manifest_files;
ICEBERG_ASSIGN_OR_RAISE(auto arrow_schema, reader_->Schema());
@@ -569,6 +574,11 @@ Result<std::vector<ManifestFile>>
ManifestListReaderImpl::Files() const {
return manifest_files;
}
+Result<std::unordered_map<std::string, std::string>>
ManifestListReaderImpl::Metadata()
+ const {
+ return reader_->Metadata();
+}
+
Result<ManifestFileField> ManifestFileFieldFromIndex(int32_t index) {
if (index >= 0 && index <
static_cast<int32_t>(ManifestFileField::kNextUnusedId)) {
return static_cast<ManifestFileField>(index);
diff --git a/src/iceberg/manifest_reader_internal.h
b/src/iceberg/manifest_reader_internal.h
index 13e3d2a..e12892e 100644
--- a/src/iceberg/manifest_reader_internal.h
+++ b/src/iceberg/manifest_reader_internal.h
@@ -40,6 +40,8 @@ class ManifestReaderImpl : public ManifestReader {
Result<std::vector<ManifestEntry>> Entries() const override;
+ Result<std::unordered_map<std::string, std::string>> Metadata() const
override;
+
private:
std::shared_ptr<Schema> schema_;
std::unique_ptr<Reader> reader_;
@@ -55,6 +57,8 @@ class ManifestListReaderImpl : public ManifestListReader {
Result<std::vector<ManifestFile>> Files() const override;
+ Result<std::unordered_map<std::string, std::string>> Metadata() const
override;
+
private:
std::shared_ptr<Schema> schema_;
std::unique_ptr<Reader> reader_;
diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc
index 9c2be56..fd730f2 100644
--- a/src/iceberg/manifest_writer.cc
+++ b/src/iceberg/manifest_writer.cc
@@ -58,13 +58,20 @@ ManifestContent ManifestWriter::content() const { return
adapter_->content(); }
Result<std::unique_ptr<Writer>> OpenFileWriter(
std::string_view location, std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> file_io,
- std::unordered_map<std::string, std::string> properties) {
- ICEBERG_ASSIGN_OR_RAISE(
- auto writer, WriterFactoryRegistry::Open(FileFormatType::kAvro,
- {.path = std::string(location),
- .schema = std::move(schema),
- .io = std::move(file_io),
- .properties =
std::move(properties)}));
+ std::unordered_map<std::string, std::string> metadata, std::string_view
schema_name) {
+ auto writer_properties = WriterProperties::default_properties();
+ if (!schema_name.empty()) {
+ writer_properties->Set(WriterProperties::kAvroSchemaName,
std::string(schema_name));
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto writer, WriterFactoryRegistry::Open(
+ FileFormatType::kAvro,
+ {
+ .path = std::string(location),
+ .schema = std::move(schema),
+ .io = std::move(file_io),
+ .metadata = std::move(metadata),
+ .properties =
std::move(writer_properties),
+ }));
return writer;
}
@@ -91,9 +98,10 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV1Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_location, std::move(schema),
- std::move(file_io),
adapter->metadata()));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
+ adapter->metadata(), "manifest_entry"));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
@@ -119,9 +127,10 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV2Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_location, std::move(schema),
- std::move(file_io),
adapter->metadata()));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
+ adapter->metadata(), "manifest_entry"));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
@@ -149,9 +158,10 @@ Result<std::unique_ptr<ManifestWriter>>
ManifestWriter::MakeV3Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_location, std::move(schema),
- std::move(file_io),
adapter->metadata()));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_location, std::move(schema), std::move(file_io),
+ adapter->metadata(), "manifest_entry"));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
@@ -191,9 +201,10 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV1Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_list_location,
std::move(schema),
- std::move(file_io),
adapter->metadata()));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io),
+ adapter->metadata(), "manifest_file"));
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
@@ -207,9 +218,10 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV2Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_list_location,
std::move(schema),
- std::move(file_io),
adapter->metadata()));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io),
+ adapter->metadata(), "manifest_file"));
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
@@ -224,9 +236,10 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV3Writer(
ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
auto schema = adapter->schema();
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_list_location,
std::move(schema),
- std::move(file_io),
adapter->metadata()));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io),
+ adapter->metadata(), "manifest_file"));
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
diff --git a/src/iceberg/parquet/parquet_reader.cc
b/src/iceberg/parquet/parquet_reader.cc
index 64373d0..9647658 100644
--- a/src/iceberg/parquet/parquet_reader.cc
+++ b/src/iceberg/parquet/parquet_reader.cc
@@ -122,7 +122,8 @@ class ParquetReader::Impl {
// Prepare reader properties
::parquet::ReaderProperties reader_properties(pool_);
::parquet::ArrowReaderProperties arrow_reader_properties;
- arrow_reader_properties.set_batch_size(options.batch_size);
+ arrow_reader_properties.set_batch_size(
+ options.properties->Get(ReaderProperties::kBatchSize));
arrow_reader_properties.set_arrow_extensions_enabled(true);
// Open the Parquet file reader
diff --git a/src/iceberg/parquet/parquet_writer.cc
b/src/iceberg/parquet/parquet_writer.cc
index 61f4671..9c8f081 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -68,7 +68,8 @@ class ParquetWriter::Impl {
ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options));
auto file_writer = ::parquet::ParquetFileWriter::Open(
- output_stream_, std::move(schema_node), std::move(writer_properties));
+ output_stream_, std::move(schema_node), std::move(writer_properties),
+ std::make_shared<::arrow::KeyValueMetadata>(options.metadata));
ICEBERG_ARROW_RETURN_NOT_OK(
::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer),
arrow_schema_,
std::move(arrow_writer_properties),
&writer_));
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 4ca8ca1..1d421ed 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -123,9 +123,13 @@ class AvroReaderTest : public TempFileTestBase {
auto export_result = ::arrow::ExportArray(*array, &arrow_array);
ASSERT_TRUE(export_result.ok());
- auto writer_result = WriterFactoryRegistry::Open(
- FileFormatType::kAvro,
- {.path = temp_avro_file_, .schema = schema, .io = file_io_});
+ std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"},
{"k2", "v2"}};
+
+ auto writer_result =
+ WriterFactoryRegistry::Open(FileFormatType::kAvro, {.path =
temp_avro_file_,
+ .schema = schema,
+ .io = file_io_,
+ .metadata =
metadata});
ASSERT_TRUE(writer_result.has_value());
auto writer = std::move(writer_result.value());
ASSERT_THAT(writer->Write(&arrow_array), IsOk());
@@ -144,6 +148,15 @@ class AvroReaderTest : public TempFileTestBase {
auto reader = std::move(reader_result.value());
ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, expected_string));
ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader));
+
+ auto metadata_result = reader->Metadata();
+ ASSERT_THAT(metadata_result, IsOk());
+ auto read_metadata = std::move(metadata_result.value());
+ for (const auto& [key, value] : metadata) {
+ auto it = read_metadata.find(key);
+ ASSERT_NE(it, read_metadata.end());
+ ASSERT_EQ(it->second, value);
+ }
}
std::shared_ptr<::arrow::fs::LocalFileSystem> local_fs_;
@@ -191,9 +204,14 @@ TEST_F(AvroReaderTest, ReadWithBatchSize) {
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(1, "id", std::make_shared<IntType>())});
+ auto reader_properties = ReaderProperties::default_properties();
+ reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2});
+
auto reader_result = ReaderFactoryRegistry::Open(
- FileFormatType::kAvro,
- {.path = temp_avro_file_, .batch_size = 2, .io = file_io_, .projection =
schema});
+ FileFormatType::kAvro, {.path = temp_avro_file_,
+ .io = file_io_,
+ .projection = schema,
+ .properties = std::move(reader_properties)});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());
diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc
index cbf49fb..a9be5fb 100644
--- a/src/iceberg/test/parquet_test.cc
+++ b/src/iceberg/test/parquet_test.cc
@@ -63,7 +63,8 @@ Status WriteArray(std::shared_ptr<::arrow::Array> data,
}
Status ReadArray(std::shared_ptr<::arrow::Array>& out,
- const ReaderOptions& reader_options) {
+ const ReaderOptions& reader_options,
+ std::unordered_map<std::string, std::string>* metadata) {
ICEBERG_ASSIGN_OR_RAISE(
auto reader, ReaderFactoryRegistry::Open(FileFormatType::kParquet,
reader_options));
ICEBERG_ASSIGN_OR_RAISE(auto read_data, reader->Next());
@@ -77,6 +78,11 @@ Status ReadArray(std::shared_ptr<::arrow::Array>& out,
ICEBERG_ASSIGN_OR_RAISE(ArrowSchema arrow_schema, reader->Schema());
ICEBERG_ARROW_ASSIGN_OR_RETURN(out,
::arrow::ImportArray(&arrow_c_array,
&arrow_schema));
+
+ if (metadata) {
+ ICEBERG_ASSIGN_OR_RAISE(*metadata, reader->Metadata());
+ }
+
return {};
}
@@ -85,18 +91,25 @@ void DoRoundtrip(std::shared_ptr<::arrow::Array> data,
std::shared_ptr<Schema> s
std::shared_ptr<FileIO> file_io =
arrow::ArrowFileSystemFileIO::MakeMockFileIO();
const std::string basePath = "base.parquet";
+ std::unordered_map<std::string, std::string> metadata = {{"k1", "v1"},
{"k2", "v2"}};
+
auto writer_data = WriterFactoryRegistry::Open(
- FileFormatType::kParquet, {.path = basePath, .schema = schema, .io =
file_io});
+ FileFormatType::kParquet,
+ {.path = basePath, .schema = schema, .io = file_io, .metadata =
metadata});
ASSERT_THAT(writer_data, IsOk())
<< "Failed to create writer: " << writer_data.error().message;
auto writer = std::move(writer_data.value());
ASSERT_THAT(WriteArray(data, *writer), IsOk());
- ASSERT_THAT(ReadArray(out, {.path = basePath,
- .length = writer->length(),
- .io = file_io,
- .projection = schema}),
+ std::unordered_map<std::string, std::string> read_metadata;
+ ASSERT_THAT(ReadArray(out,
+ {.path = basePath,
+ .length = writer->length(),
+ .io = file_io,
+ .projection = schema},
+ &read_metadata),
IsOk());
+ ASSERT_EQ(read_metadata, metadata);
ASSERT_TRUE(out != nullptr) << "Reader.Next() returned no data";
}
@@ -231,11 +244,14 @@ TEST_F(ParquetReaderTest, ReadWithBatchSize) {
auto schema = std::make_shared<Schema>(
std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
- auto reader_result =
- ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path =
temp_parquet_file_,
- .batch_size = 2,
- .io = file_io_,
- .projection =
schema});
+ auto reader_properties = ReaderProperties::default_properties();
+ reader_properties->Set(ReaderProperties::kBatchSize, int64_t{2});
+
+ auto reader_result = ReaderFactoryRegistry::Open(
+ FileFormatType::kParquet, {.path = temp_parquet_file_,
+ .io = file_io_,
+ .projection = schema,
+ .properties = std::move(reader_properties)});
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());
@@ -271,13 +287,19 @@ TEST_F(ParquetReaderTest, ReadSplit) {
R"([[1], [2], [3]])", R"([[1], [2]])", R"([[3]])", "", "",
};
+ std::shared_ptr<ReaderProperties> reader_properties =
+ ReaderProperties::default_properties();
+ reader_properties->Set(ReaderProperties::kBatchSize, int64_t{100});
+
for (size_t i = 0; i < splits.size(); ++i) {
- auto reader_result =
- ReaderFactoryRegistry::Open(FileFormatType::kParquet, {.path =
temp_parquet_file_,
- .split =
splits[i],
- .batch_size =
100,
- .io = file_io_,
- .projection =
schema});
+ auto reader_result = ReaderFactoryRegistry::Open(FileFormatType::kParquet,
+ {
+ .path =
temp_parquet_file_,
+ .split = splits[i],
+ .io = file_io_,
+ .projection = schema,
+ .properties =
reader_properties,
+ });
ASSERT_THAT(reader_result, IsOk());
auto reader = std::move(reader_result.value());
if (!expected_json[i].empty()) {
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 62608a9..5485d83 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -139,6 +139,8 @@ class ManifestListWriter;
class ManifestReader;
class ManifestWriter;
+struct ReaderOptions;
+struct WriterOptions;
class Reader;
class Writer;