This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d4ad8f7f feat(avro): support writing multiple blocks (#470)
d4ad8f7f is described below
commit d4ad8f7fb4c51f67bd2a45e431d674d67ef514db
Author: Gang Wu <[email protected]>
AuthorDate: Mon Jan 5 09:48:25 2026 +0800
feat(avro): support writing multiple blocks (#470)
---
src/iceberg/avro/avro_writer.cc | 1 +
src/iceberg/test/avro_test.cc | 54 ++++++++++++++++++++++++++++++++++++++++-
2 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index b426a756..307f2fd6 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -80,6 +80,7 @@ class DirectEncoderBackend : public AvroWriteBackend {
Status WriteRow(const Schema& write_schema, const ::arrow::Array& array,
int64_t row_index) override {
+ writer_->syncIfNeeded();
ICEBERG_RETURN_UNEXPECTED(EncodeArrowToAvro(avro_root_node_,
writer_->encoder(),
write_schema, array, row_index,
encode_ctx_));
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 3ebc4c10..404f763a 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -18,15 +18,19 @@
*/
#include <sstream>
+#include <unordered_map>
#include <arrow/array.h>
#include <arrow/array/array_base.h>
#include <arrow/c/bridge.h>
#include <arrow/json/from_string.h>
+#include <avro/DataFile.hh>
+#include <avro/Generic.hh>
#include <gtest/gtest.h>
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
#include "iceberg/avro/avro_register.h"
+#include "iceberg/avro/avro_stream_internal.h"
#include "iceberg/avro/avro_writer.h"
#include "iceberg/file_reader.h"
#include "iceberg/metadata_columns.h"
@@ -34,6 +38,7 @@
#include "iceberg/schema_internal.h"
#include "iceberg/test/matchers.h"
#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
namespace iceberg::avro {
@@ -639,7 +644,9 @@ class AvroWriterTest : public ::testing::Test,
skip_datum_ = GetParam();
}
- void WriteAvroFile(std::shared_ptr<Schema> schema, const std::string&
json_data) {
+ void WriteAvroFile(
+ std::shared_ptr<Schema> schema, const std::string& json_data,
+ const std::unordered_map<std::string, std::string>& extra_properties =
{}) {
ArrowSchema arrow_c_schema;
ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk());
@@ -660,6 +667,9 @@ class AvroWriterTest : public ::testing::Test,
auto writer_properties = WriterProperties::default_properties();
writer_properties->Set(WriterProperties::kAvroSkipDatum, skip_datum_);
+ for (const auto& [key, value] : extra_properties) {
+ writer_properties->mutable_configs().emplace(key, value);
+ }
auto writer_result = WriterFactoryRegistry::Open(
FileFormatType::kAvro, {.path = temp_avro_file_,
@@ -884,6 +894,48 @@ TEST_P(AvroWriterTest, WriteLargeDataset) {
VerifyWrittenData(json.str());
}
+TEST_P(AvroWriterTest, MultipleAvroBlocks) {
+ auto schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeRequired(2, "name",
string())});
+
+ const std::string json_data = R"([
+ [1, "Alice_with_a_very_long_name_to_exceed_sync_interval"],
+ [2, "Bob_with_another_very_long_name_to_exceed_sync_interval"],
+ [3, "Charlie_with_yet_another_very_long_name_to_exceed_sync"],
+ [4, "David_with_a_super_long_name_that_will_exceed_interval"],
+ [5, "Eve_with_an_extremely_long_name_to_force_new_block_here"]
+ ])";
+
+ const std::vector<std::pair</*sync_interval*/ std::string, /*num_blocks*/
size_t>>
+ test_cases = {{"32", 5}, {"65536", 1}};
+
+ for (const auto& [interval, num_blocks] : test_cases) {
+ WriteAvroFile(schema, json_data,
+ {{WriterProperties::kAvroSyncInterval.key(), interval}});
+ VerifyWrittenData(json_data);
+
+ // Use raw avro-cpp reader to count blocks by tracking previousSync()
changes
+ auto mock_io =
internal::checked_pointer_cast<arrow::ArrowFileSystemFileIO>(file_io_);
+ auto input = mock_io->fs()->OpenInputFile(temp_avro_file_).ValueOrDie();
+ auto input_stream = std::make_unique<AvroInputStream>(std::move(input),
1024 * 1024);
+ ::avro::DataFileReader<::avro::GenericDatum>
avro_reader(std::move(input_stream));
+ ::avro::GenericDatum datum(avro_reader.dataSchema());
+
+ size_t block_count = 0;
+ int64_t last_sync = -1;
+
+ while (avro_reader.read(datum)) {
+ if (int64_t current_sync = avro_reader.previousSync(); current_sync !=
last_sync) {
+ block_count++;
+ last_sync = current_sync;
+ }
+ }
+
+ ASSERT_EQ(block_count, num_blocks);
+ }
+}
+
// Instantiate parameterized tests for both direct encoder and GenericDatum
paths
INSTANTIATE_TEST_SUITE_P(DirectEncoderModes, AvroWriterTest,
::testing::Values(true, false),