This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 8906ff3 feat: implement manifest and manifest list writer adapter
(cont'd) (#216)
8906ff3 is described below
commit 8906ff3e3a34f7f34ad8ca6d6f25388e80f32830
Author: dongxiao <[email protected]>
AuthorDate: Fri Oct 17 09:37:23 2025 +0800
feat: implement manifest and manifest list writer adapter (cont'd) (#216)
---
src/iceberg/CMakeLists.txt | 14 +-
src/iceberg/arrow/arrow_fs_file_io.cc | 2 +-
...ransform_internal.h => arrow_status_internal.h} | 0
.../nanoarrow_status_internal.h} | 40 +-
src/iceberg/avro/avro_data_util.cc | 2 +-
src/iceberg/avro/avro_reader.cc | 2 +-
src/iceberg/avro/avro_writer.cc | 8 +-
src/iceberg/avro/avro_writer.h | 2 +-
src/iceberg/file_writer.h | 3 +-
src/iceberg/manifest_adapter.cc | 683 +++++++++++++++++++++
src/iceberg/manifest_adapter.h | 87 ++-
src/iceberg/manifest_entry.cc | 10 +-
src/iceberg/manifest_entry.h | 4 +-
src/iceberg/manifest_reader_internal.cc | 18 +-
src/iceberg/manifest_writer.cc | 108 ++--
src/iceberg/manifest_writer.h | 13 +-
src/iceberg/meson.build | 4 +
src/iceberg/parquet/parquet_data_util.cc | 2 +-
src/iceberg/parquet/parquet_reader.cc | 2 +-
src/iceberg/parquet/parquet_writer.cc | 8 +-
src/iceberg/parquet/parquet_writer.h | 2 +-
src/iceberg/partition_spec.cc | 47 ++
src/iceberg/partition_spec.h | 15 +-
src/iceberg/test/CMakeLists.txt | 4 +-
src/iceberg/test/avro_test.cc | 2 +-
...test.cc => manifest_list_reader_writer_test.cc} | 84 ++-
...ader_test.cc => manifest_reader_writer_test.cc} | 78 ++-
src/iceberg/test/parquet_test.cc | 4 +-
src/iceberg/test/partition_spec_test.cc | 19 +
src/iceberg/type_fwd.h | 1 +
src/iceberg/v1_metadata.cc | 120 ++++
src/iceberg/v1_metadata.h | 34 +-
src/iceberg/v2_metadata.cc | 157 +++++
src/iceberg/v2_metadata.h | 42 +-
src/iceberg/v3_metadata.cc | 213 +++++++
src/iceberg/v3_metadata.h | 58 +-
36 files changed, 1679 insertions(+), 213 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 8c9d6a9..75ac9c8 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -18,6 +18,7 @@
set(ICEBERG_INCLUDES "$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
"$<BUILD_INTERFACE:${PROJECT_SOURCE_DIR}/src>")
set(ICEBERG_SOURCES
+ arrow_c_data_guard_internal.cc
catalog/memory/in_memory_catalog.cc
expression/expression.cc
expression/expressions.cc
@@ -28,8 +29,12 @@ set(ICEBERG_SOURCES
file_writer.cc
inheritable_metadata.cc
json_internal.cc
+ manifest_adapter.cc
manifest_entry.cc
manifest_list.cc
+ manifest_reader.cc
+ manifest_reader_internal.cc
+ manifest_writer.cc
metadata_columns.cc
name_mapping.cc
partition_field.cc
@@ -51,16 +56,15 @@ set(ICEBERG_SOURCES
transform.cc
transform_function.cc
type.cc
- manifest_reader.cc
- manifest_reader_internal.cc
- manifest_writer.cc
- arrow_c_data_guard_internal.cc
util/conversions.cc
util/decimal.cc
util/gzip_internal.cc
util/murmurhash3_internal.cc
util/timepoint.cc
- util/uuid.cc)
+ util/uuid.cc
+ v1_metadata.cc
+ v2_metadata.cc
+ v3_metadata.cc)
set(ICEBERG_STATIC_BUILD_INTERFACE_LIBS)
set(ICEBERG_SHARED_BUILD_INTERFACE_LIBS)
diff --git a/src/iceberg/arrow/arrow_fs_file_io.cc
b/src/iceberg/arrow/arrow_fs_file_io.cc
index 35e0d4e..be62b79 100644
--- a/src/iceberg/arrow/arrow_fs_file_io.cc
+++ b/src/iceberg/arrow/arrow_fs_file_io.cc
@@ -22,9 +22,9 @@
#include <arrow/filesystem/localfs.h>
#include <arrow/filesystem/mockfs.h>
-#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_file_io.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
namespace iceberg::arrow {
diff --git a/src/iceberg/arrow/arrow_error_transform_internal.h
b/src/iceberg/arrow/arrow_status_internal.h
similarity index 100%
rename from src/iceberg/arrow/arrow_error_transform_internal.h
rename to src/iceberg/arrow/arrow_status_internal.h
diff --git a/src/iceberg/avro/avro_writer.h
b/src/iceberg/arrow/nanoarrow_status_internal.h
similarity index 56%
copy from src/iceberg/avro/avro_writer.h
copy to src/iceberg/arrow/nanoarrow_status_internal.h
index 57499d8..86f396a 100644
--- a/src/iceberg/avro/avro_writer.h
+++ b/src/iceberg/arrow/nanoarrow_status_internal.h
@@ -19,33 +19,13 @@
#pragma once
-#include "iceberg/file_writer.h"
-#include "iceberg/iceberg_bundle_export.h"
-
-namespace iceberg::avro {
-
-/// \brief A writer for serializing ArrowArray to Avro files.
-class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
- public:
- AvroWriter() = default;
-
- ~AvroWriter() override;
-
- Status Open(const WriterOptions& options) final;
-
- Status Close() final;
-
- Status Write(ArrowArray data) final;
-
- std::optional<Metrics> metrics() final;
-
- std::optional<int64_t> length() final;
-
- std::vector<int64_t> split_offsets() final;
-
- private:
- class Impl;
- std::unique_ptr<Impl> impl_;
-};
-
-} // namespace iceberg::avro
+#define ICEBERG_NANOARROW_RETURN_UNEXPECTED(status) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return iceberg::InvalidArrowData("nanoarrow error: {}", status); \
+ }
+
+#define ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error) \
+ if (status != NANOARROW_OK) [[unlikely]] { \
+ return iceberg::InvalidArrowData("nanoarrow error: {} msg: {}", status, \
+ error.message); \
+ }
diff --git a/src/iceberg/avro/avro_data_util.cc
b/src/iceberg/avro/avro_data_util.cc
index 8853f74..92a3e7a 100644
--- a/src/iceberg/avro/avro_data_util.cc
+++ b/src/iceberg/avro/avro_data_util.cc
@@ -32,7 +32,7 @@
#include <avro/NodeImpl.hh>
#include <avro/Types.hh>
-#include "iceberg/arrow/arrow_error_transform_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_schema_util_internal.h"
#include "iceberg/schema.h"
diff --git a/src/iceberg/avro/avro_reader.cc b/src/iceberg/avro/avro_reader.cc
index 6452612..80087c8 100644
--- a/src/iceberg/avro/avro_reader.cc
+++ b/src/iceberg/avro/avro_reader.cc
@@ -31,8 +31,8 @@
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
-#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
diff --git a/src/iceberg/avro/avro_writer.cc b/src/iceberg/avro/avro_writer.cc
index 4d82fb9..ded0d48 100644
--- a/src/iceberg/avro/avro_writer.cc
+++ b/src/iceberg/avro/avro_writer.cc
@@ -29,8 +29,8 @@
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
-#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/avro/avro_data_util_internal.h"
#include "iceberg/avro/avro_register.h"
#include "iceberg/avro/avro_schema_util_internal.h"
@@ -75,9 +75,9 @@ class AvroWriter::Impl {
return {};
}
- Status Write(ArrowArray data) {
+ Status Write(ArrowArray* data) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto result,
- ::arrow::ImportArray(&data,
&arrow_schema_));
+ ::arrow::ImportArray(data, &arrow_schema_));
for (int64_t i = 0; i < result->length(); i++) {
ICEBERG_RETURN_UNEXPECTED(ExtractDatumFromArray(*result, i,
datum_.get()));
@@ -119,7 +119,7 @@ class AvroWriter::Impl {
AvroWriter::~AvroWriter() = default;
-Status AvroWriter::Write(ArrowArray data) { return impl_->Write(data); }
+Status AvroWriter::Write(ArrowArray* data) { return impl_->Write(data); }
Status AvroWriter::Open(const WriterOptions& options) {
impl_ = std::make_unique<Impl>();
diff --git a/src/iceberg/avro/avro_writer.h b/src/iceberg/avro/avro_writer.h
index 57499d8..0a5dd0b 100644
--- a/src/iceberg/avro/avro_writer.h
+++ b/src/iceberg/avro/avro_writer.h
@@ -35,7 +35,7 @@ class ICEBERG_BUNDLE_EXPORT AvroWriter : public Writer {
Status Close() final;
- Status Write(ArrowArray data) final;
+ Status Write(ArrowArray* data) final;
std::optional<Metrics> metrics() final;
diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h
index 135151a..fba97d3 100644
--- a/src/iceberg/file_writer.h
+++ b/src/iceberg/file_writer.h
@@ -65,7 +65,8 @@ class ICEBERG_EXPORT Writer {
/// \brief Write arrow data to the file.
///
/// \return Status of write results.
- virtual Status Write(ArrowArray data) = 0;
+ /// \note Ownership of the data is transferred to the writer.
+ virtual Status Write(ArrowArray* data) = 0;
/// \brief Get the file statistics.
/// Only valid after the file is closed.
diff --git a/src/iceberg/manifest_adapter.cc b/src/iceberg/manifest_adapter.cc
new file mode 100644
index 0000000..bc0f834
--- /dev/null
+++ b/src/iceberg/manifest_adapter.cc
@@ -0,0 +1,683 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest_adapter.h"
+
+#include <nanoarrow/nanoarrow.h>
+
+#include "iceberg/arrow/nanoarrow_status_internal.h"
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+
+Status AppendField(ArrowArray* array, int64_t value) {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value));
+ return {};
+}
+
+Status AppendField(ArrowArray* array, uint64_t value) {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendUInt(array, value));
+ return {};
+}
+
+Status AppendField(ArrowArray* array, double value) {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendDouble(array, value));
+ return {};
+}
+
+Status AppendField(ArrowArray* array, std::string_view value) {
+ ArrowStringView view(value.data(), value.size());
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendString(array, view));
+ return {};
+}
+
+Status AppendField(ArrowArray* array, std::span<const uint8_t> value) {
+ ArrowBufferViewData data;
+ data.as_char = reinterpret_cast<const char*>(value.data());
+ ArrowBufferView view(data, value.size());
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendBytes(array, view));
+ return {};
+}
+
+Status AppendList(ArrowArray* array, const std::vector<int32_t>& list_value) {
+ auto list_array = array->children[0];
+ for (const auto& value : list_value) {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(
+ ArrowArrayAppendInt(list_array, static_cast<int64_t>(value)));
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
+ return {};
+}
+
+Status AppendList(ArrowArray* array, const std::vector<int64_t>& list_value) {
+ auto list_array = array->children[0];
+ for (const auto& value : list_value) {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(list_array,
value));
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
+ return {};
+}
+
+Status AppendMap(ArrowArray* array, const std::map<int32_t, int64_t>&
map_value) {
+ auto map_array = array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidArrowData("Map array must have exactly 2 children.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(map_array));
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
+ return {};
+}
+
+Status AppendMap(ArrowArray* array,
+ const std::map<int32_t, std::vector<uint8_t>>& map_value) {
+ auto map_array = array->children[0];
+ if (map_array->n_children != 2) {
+ return InvalidArrowData("Map array must have exactly 2 children.");
+ }
+ for (const auto& [key, value] : map_value) {
+ auto key_array = map_array->children[0];
+ auto value_array = map_array->children[1];
+ ICEBERG_RETURN_UNEXPECTED(AppendField(key_array,
static_cast<int64_t>(key)));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(value_array, value));
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(map_array));
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
+ return {};
+}
+
+} // namespace
+
+Status ManifestAdapter::StartAppending() {
+ if (size_ > 0) {
+ return InvalidArgument("Adapter buffer not empty, cannot start
appending.");
+ }
+ array_ = {};
+ size_ = 0;
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayInitFromSchema(&array_, &schema_, &error), error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayStartAppending(&array_));
+ return {};
+}
+
+Result<ArrowArray*> ManifestAdapter::FinishAppending() {
+ ArrowError error;
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(
+ ArrowArrayFinishBuildingDefault(&array_, &error), error);
+ return &array_;
+}
+
+ManifestEntryAdapter::~ManifestEntryAdapter() {
+ if (array_.release != nullptr) {
+ ArrowArrayRelease(&array_);
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapter::GetManifestEntryType() {
+ if (partition_spec_ == nullptr) [[unlikely]] {
+ return ManifestEntry::TypeFromPartitionType(nullptr);
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
partition_spec_->PartitionType());
+ return ManifestEntry::TypeFromPartitionType(std::move(partition_type));
+}
+
+Status ManifestEntryAdapter::AppendPartitionValues(
+ ArrowArray* array, const std::shared_ptr<StructType>& partition_type,
+ const std::vector<Literal>& partition_values) {
+ if (array->n_children != partition_type->fields().size()) [[unlikely]] {
+ return InvalidArrowData("Arrow array of partition does not match partition
type.");
+ }
+ if (partition_values.size() != partition_type->fields().size()) [[unlikely]]
{
+ return InvalidArrowData("Literal list of partition does not match
partition type.");
+ }
+ auto fields = partition_type->fields();
+
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& partition_value = partition_values[i];
+ const auto& partition_field = fields[i];
+ auto child_array = array->children[i];
+ if (partition_value.IsNull()) {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array,
1));
+ continue;
+ }
+ switch (partition_field.type()->type_id()) {
+ case TypeId::kBoolean:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ child_array, static_cast<uint64_t>(
+ std::get<bool>(partition_value.value()) == true ?
1L : 0L)));
+ break;
+ case TypeId::kInt:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ child_array,
+ static_cast<int64_t>(std::get<int32_t>(partition_value.value()))));
+ break;
+ case TypeId::kLong:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array,
std::get<int64_t>(partition_value.value())));
+ break;
+ case TypeId::kFloat:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ child_array,
static_cast<double>(std::get<float>(partition_value.value()))));
+ break;
+ case TypeId::kDouble:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array,
std::get<double>(partition_value.value())));
+ break;
+ case TypeId::kString:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array,
std::get<std::string>(partition_value.value())));
+ break;
+ case TypeId::kFixed:
+ case TypeId::kBinary:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ child_array,
std::get<std::vector<uint8_t>>(partition_value.value())));
+ break;
+ case TypeId::kDate:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ child_array,
+ static_cast<int64_t>(std::get<int32_t>(partition_value.value()))));
+ break;
+ case TypeId::kTime:
+ case TypeId::kTimestamp:
+ case TypeId::kTimestampTz:
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array,
std::get<int64_t>(partition_value.value())));
+ break;
+ case TypeId::kDecimal:
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ child_array, std::get<std::array<uint8_t,
16>>(partition_value.value())));
+ break;
+ case TypeId::kUuid:
+ case TypeId::kStruct:
+ case TypeId::kList:
+ case TypeId::kMap:
+ // TODO(xiao.dong) Literals do not currently support these types
+ default:
+ return InvalidManifest("Unsupported partition type: {}",
+ partition_field.ToString());
+ }
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
+ return {};
+}
+
+Status ManifestEntryAdapter::AppendDataFile(
+ ArrowArray* array, const std::shared_ptr<StructType>& data_file_type,
+ const DataFile& file) {
+ auto fields = data_file_type->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto child_array = array->children[i];
+
+ switch (field.field_id()) {
+ case 134: // content (optional int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array, static_cast<int64_t>(file.content)));
+ break;
+ case 100: // file_path (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.file_path));
+ break;
+ case 101: // file_format (required string)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
ToString(file.file_format)));
+ break;
+ case 102: {
+ // partition (required struct)
+ auto partition_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendPartitionValues(child_array, partition_type,
file.partition));
+ } break;
+ case 103: // record_count (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.record_count));
+ break;
+ case 104: // file_size_in_bytes (required int64)
+ ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
file.file_size_in_bytes));
+ break;
+ case 105: // block_size_in_bytes (compatible in v1)
+ // always 64MB for v1
+ ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
kBlockSizeInBytesV1));
+ break;
+ case 108: // column_sizes (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.column_sizes));
+ break;
+ case 109: // value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.value_counts));
+ break;
+ case 110: // null_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array,
file.null_value_counts));
+ break;
+ case 137: // nan_value_counts (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array,
file.nan_value_counts));
+ break;
+ case 125: // lower_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.lower_bounds));
+ break;
+ case 128: // upper_bounds (optional map)
+ ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.upper_bounds));
+ break;
+ case 131: // key_metadata (optional binary)
+ if (!file.key_metadata.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
file.key_metadata));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
+ }
+ break;
+ case 132: // split_offsets (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(child_array, file.split_offsets));
+ break;
+ case 135: // equality_ids (optional list)
+ ICEBERG_RETURN_UNEXPECTED(AppendList(child_array, file.equality_ids));
+ break;
+ case 140: // sort_order_id (optional int32)
+ if (file.sort_order_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array,
static_cast<int64_t>(file.sort_order_id.value())));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
+ }
+ break;
+ case 142: // first_row_id (optional int64)
+ if (file.first_row_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(child_array,
file.first_row_id.value()));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
+ }
+ break;
+ case 143: {
+ // referenced_data_file (optional string)
+ ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file,
GetReferenceDataFile(file));
+ if (referenced_data_file.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array, referenced_data_file.value()));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
+ }
+ break;
+ }
+ case 144: // content_offset (optional int64)
+ if (file.content_offset.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array, file.content_offset.value()));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
+ }
+ break;
+ case 145: // content_size_in_bytes (optional int64)
+ if (file.content_size_in_bytes.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(child_array, file.content_size_in_bytes.value()));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown data file field id: {} ",
field.field_id());
+ }
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
+ return {};
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetSequenceNumber(
+ const ManifestEntry& entry) const {
+ return entry.sequence_number;
+}
+
+Result<std::optional<std::string>> ManifestEntryAdapter::GetReferenceDataFile(
+ const DataFile& file) const {
+ return file.referenced_data_file;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetFirstRowId(
+ const DataFile& file) const {
+ return file.first_row_id;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetContentOffset(
+ const DataFile& file) const {
+ return file.content_offset;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapter::GetContentSizeInBytes(
+ const DataFile& file) const {
+ return file.content_size_in_bytes;
+}
+
+Status ManifestEntryAdapter::AppendInternal(const ManifestEntry& entry) {
+ const auto& fields = manifest_schema_->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = array_.children[i];
+
+ switch (field.field_id()) {
+ case 0: // status (required int32)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(entry.status))));
+ break;
+ case 1: // snapshot_id (optional int64)
+ if (entry.snapshot_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
entry.snapshot_id.value()));
+ } else {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 2: // data_file (required struct)
+ if (entry.data_file) {
+ // Get the data file type from the field
+ auto data_file_type =
internal::checked_pointer_cast<StructType>(field.type());
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendDataFile(array, data_file_type, *entry.data_file));
+ } else {
+ return InvalidManifest("Missing required data_file field from
manifest entry.");
+ }
+ break;
+ case 3: {
+ // sequence_number (optional int64)
+ ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(entry));
+ if (sequence_num.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num.value()));
+ } else {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ case 4: // file_sequence_number (optional int64)
+ if (entry.file_sequence_number.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, entry.file_sequence_number.value()));
+ } else {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ default:
+ return InvalidManifest("Unknown manifest entry field id: {}",
field.field_id());
+ }
+ }
+
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array_));
+ size_++;
+ return {};
+}
+
+Status ManifestEntryAdapter::InitSchema(const std::unordered_set<int32_t>&
fields_ids) {
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_entry_type, GetManifestEntryType())
+ auto fields_span = manifest_entry_type->fields();
+ std::vector<SchemaField> fields;
+ // TODO(xiao.dong) Make this a common function to recursively handle
+ // all nested fields in the schema
+ for (const auto& field : fields_span) {
+ if (field.field_id() == 2) {
+ // handle data_file field
+ auto data_file_struct =
internal::checked_pointer_cast<StructType>(field.type());
+ std::vector<SchemaField> data_file_fields;
+ for (const auto& data_file_field : data_file_struct->fields()) {
+ if (fields_ids.contains(data_file_field.field_id())) {
+ data_file_fields.emplace_back(data_file_field);
+ }
+ }
+ auto type = std::make_shared<StructType>(data_file_fields);
+ auto data_file_field = SchemaField::MakeRequired(
+ field.field_id(), std::string(field.name()), std::move(type));
+ fields.emplace_back(std::move(data_file_field));
+ } else {
+ if (fields_ids.contains(field.field_id())) {
+ fields.emplace_back(field);
+ }
+ }
+ }
+ manifest_schema_ = std::make_shared<Schema>(fields);
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_schema_, &schema_));
+ return {};
+}
+
+ManifestFileAdapter::~ManifestFileAdapter() {
+ if (array_.release != nullptr) {
+ ArrowArrayRelease(&array_);
+ }
+ if (schema_.release != nullptr) {
+ ArrowSchemaRelease(&schema_);
+ }
+}
+
+Status ManifestFileAdapter::AppendPartitionSummary(
+ ArrowArray* array, const std::shared_ptr<ListType>& summary_type,
+ const std::vector<PartitionFieldSummary>& summaries) {
+ auto& summary_array = array->children[0];
+ if (summary_array->n_children != 4) {
+ return InvalidManifestList("Invalid partition summary array.");
+ }
+ auto summary_struct =
+
internal::checked_pointer_cast<StructType>(summary_type->fields()[0].type());
+ auto summary_fields = summary_struct->fields();
+ for (const auto& summary : summaries) {
+ for (const auto& summary_field : summary_fields) {
+ switch (summary_field.field_id()) {
+ case 509: // contains_null (required bool)
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(summary_array->children[0],
+ static_cast<uint64_t>(summary.contains_null ? 1 :
0)));
+ break;
+ case 518: {
+ // contains_nan (optional bool)
+ auto field_array = summary_array->children[1];
+ if (summary.contains_nan.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(field_array,
+ static_cast<uint64_t>(summary.contains_nan.value()
? 1 : 0)));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(field_array, 1));
+ }
+ break;
+ }
+ case 510: {
+ // lower_bound (optional binary)
+ auto field_array = summary_array->children[2];
+ if (summary.lower_bound.has_value() &&
!summary.lower_bound->empty()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(field_array, summary.lower_bound.value()));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(field_array, 1));
+ }
+ break;
+ }
+ case 511: {
+ // upper_bound (optional binary)
+ auto field_array = summary_array->children[3];
+ if (summary.upper_bound.has_value() &&
!summary.upper_bound->empty()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(field_array, summary.upper_bound.value()));
+ } else {
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(field_array, 1));
+ }
+ break;
+ }
+ default:
+ return InvalidManifestList("Unknown field id: {}",
summary_field.field_id());
+ }
+ }
+
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(summary_array));
+ }
+
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(array));
+ return {};
+}
+
+Result<int64_t> ManifestFileAdapter::GetSequenceNumber(const ManifestFile&
file) const {
+ return file.sequence_number;
+}
+
+Result<int64_t> ManifestFileAdapter::GetMinSequenceNumber(
+ const ManifestFile& file) const {
+ return file.min_sequence_number;
+}
+
+Result<std::optional<int64_t>> ManifestFileAdapter::GetFirstRowId(
+ const ManifestFile& file) const {
+ return file.first_row_id;
+}
+
+Status ManifestFileAdapter::AppendInternal(const ManifestFile& file) {
+ const auto& fields = manifest_list_schema_->fields();
+ for (int32_t i = 0; i < fields.size(); i++) {
+ const auto& field = fields[i];
+ auto array = array_.children[i];
+ switch (field.field_id()) {
+ case 500: // manifest_path
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_path));
+ break;
+ case 501: // manifest_length
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_length));
+ break;
+ case 502: // partition_spec_id
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array, static_cast<int64_t>(file.partition_spec_id)));
+ break;
+ case 517: // content
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(static_cast<int32_t>(file.content))));
+ break;
+ case 515: {
+ // sequence_number
+ ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(file));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num));
+ break;
+ }
+ case 516: {
+ // min_sequence_number
+ ICEBERG_ASSIGN_OR_RAISE(auto min_sequence_num,
GetMinSequenceNumber(file));
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, min_sequence_num));
+ break;
+ }
+ case 503: // added_snapshot_id
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.added_snapshot_id));
+ break;
+ case 504: // added_files_count
+ if (file.added_files_count.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(file.added_files_count.value())));
+ } else {
+ // Append null for optional field
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 505: // existing_files_count
+ if (file.existing_files_count.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(
+ array, static_cast<int64_t>(file.existing_files_count.value())));
+ } else {
+ // Append null for optional field
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 506: // deleted_files_count
+ if (file.deleted_files_count.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(
+ AppendField(array,
static_cast<int64_t>(file.deleted_files_count.value())));
+ } else {
+ // Append null for optional field
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 512: // added_rows_count
+ if (file.added_rows_count.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file.added_rows_count.value()));
+ } else {
+ // Append null for optional field
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 513: // existing_rows_count
+ if (file.existing_rows_count.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file.existing_rows_count.value()));
+ } else {
+ // Append null for optional field
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 514: // deleted_rows_count
+ if (file.deleted_rows_count.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array,
file.deleted_rows_count.value()));
+ } else {
+ // Append null for optional field
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 507: // partitions
+ ICEBERG_RETURN_UNEXPECTED(AppendPartitionSummary(
+ array, internal::checked_pointer_cast<ListType>(field.type()),
+ file.partitions));
+ break;
+ case 519: // key_metadata
+ if (!file.key_metadata.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.key_metadata));
+ } else {
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ case 520: {
+ // first_row_id
+ ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetFirstRowId(file));
+ if (first_row_id.has_value()) {
+ ICEBERG_RETURN_UNEXPECTED(AppendField(array, first_row_id.value()));
+ } else {
+ // Append null for optional field
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
+ }
+ break;
+ }
+ default:
+ return InvalidManifestList("Unknown field id: {}", field.field_id());
+ }
+ }
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayFinishElement(&array_));
+ size_++;
+ return {};
+}
+
+Status ManifestFileAdapter::InitSchema(const std::unordered_set<int32_t>&
fields_ids) {
+ std::vector<SchemaField> fields;
+ for (const auto& field : ManifestFile::Type().fields()) {
+ if (fields_ids.contains(field.field_id())) {
+ fields.emplace_back(field);
+ }
+ }
+ manifest_list_schema_ = std::make_shared<Schema>(fields);
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*manifest_list_schema_, &schema_));
+ return {};
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest_adapter.h b/src/iceberg/manifest_adapter.h
index 2ffe51b..7c2a8bb 100644
--- a/src/iceberg/manifest_adapter.h
+++ b/src/iceberg/manifest_adapter.h
@@ -19,8 +19,14 @@
#pragma once
-/// \file iceberg/metadata_adapter.h
-/// Base class of adapter for v1v2v3v4 metadata.
+/// \file iceberg/manifest_adapter.h
+/// Base class for adapters handling v1/v2/v3/v4 manifest metadata.
+
+#include <memory>
+#include <optional>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
#include "iceberg/arrow_c_data.h"
#include "iceberg/result.h"
@@ -28,39 +34,96 @@
namespace iceberg {
-// \brief Base class to append manifest metadata to Arrow array.
+/// \brief Base class for appending manifest metadata to Arrow arrays.
class ICEBERG_EXPORT ManifestAdapter {
public:
ManifestAdapter() = default;
virtual ~ManifestAdapter() = default;
+ virtual Status Init() = 0;
- virtual Status StartAppending() = 0;
- virtual Result<ArrowArray> FinishAppending() = 0;
+ Status StartAppending();
+ Result<ArrowArray*> FinishAppending();
int64_t size() const { return size_; }
protected:
ArrowArray array_;
+ // Arrow schema of manifest or manifest list depending on the subclass
+ ArrowSchema schema_;
+ // Number of appended elements in the array
int64_t size_ = 0;
+ std::unordered_map<std::string, std::string> metadata_;
};
-// \brief Implemented by different versions with different schemas to
-// append a list of `ManifestEntry`s to an `ArrowArray`.
+/// \brief Adapter for appending a list of `ManifestEntry`s to an `ArrowArray`.
+/// Implemented by different versions with version-specific schemas.
class ICEBERG_EXPORT ManifestEntryAdapter : public ManifestAdapter {
public:
- ManifestEntryAdapter() = default;
- ~ManifestEntryAdapter() override = default;
+ explicit ManifestEntryAdapter(std::shared_ptr<PartitionSpec> partition_spec)
+ : partition_spec_(std::move(partition_spec)) {}
+ ~ManifestEntryAdapter() override;
virtual Status Append(const ManifestEntry& entry) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return manifest_schema_; }
+
+ protected:
+ virtual Result<std::shared_ptr<StructType>> GetManifestEntryType();
+
+ /// \brief Initialize version-specific schema.
+ ///
+ /// \param fields_ids Field IDs to include in the manifest schema. The
schema will be
+ /// initialized to include only the fields with these IDs.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestEntry& entry);
+ Status AppendDataFile(ArrowArray* array,
+ const std::shared_ptr<StructType>& data_file_type,
+ const DataFile& file);
+ static Status AppendPartitionValues(ArrowArray* array,
+ const std::shared_ptr<StructType>&
partition_type,
+ const std::vector<Literal>&
partition_values);
+
+ virtual Result<std::optional<int64_t>> GetSequenceNumber(
+ const ManifestEntry& entry) const;
+ virtual Result<std::optional<std::string>> GetReferenceDataFile(
+ const DataFile& file) const;
+ virtual Result<std::optional<int64_t>> GetFirstRowId(const DataFile& file)
const;
+ virtual Result<std::optional<int64_t>> GetContentOffset(const DataFile&
file) const;
+ virtual Result<std::optional<int64_t>> GetContentSizeInBytes(
+ const DataFile& file) const;
+
+ protected:
+ std::shared_ptr<PartitionSpec> partition_spec_;
+ std::shared_ptr<Schema> manifest_schema_;
};
-// \brief Implemented by different versions with different schemas to
-// append a list of `ManifestFile`s to an `ArrowArray`.
+/// \brief Adapter for appending a list of `ManifestFile`s to an `ArrowArray`.
+/// Implemented by different versions with version-specific schemas.
class ICEBERG_EXPORT ManifestFileAdapter : public ManifestAdapter {
public:
ManifestFileAdapter() = default;
- ~ManifestFileAdapter() override = default;
+ ~ManifestFileAdapter() override;
virtual Status Append(const ManifestFile& file) = 0;
+
+ const std::shared_ptr<Schema>& schema() const { return
manifest_list_schema_; }
+
+ protected:
+ /// \brief Initialize version-specific schema.
+ ///
+ /// \param fields_ids Field IDs to include in the manifest list schema. The
schema will
+ /// be initialized to include only the fields with these IDs.
+ Status InitSchema(const std::unordered_set<int32_t>& fields_ids);
+ Status AppendInternal(const ManifestFile& file);
+ static Status AppendPartitionSummary(
+ ArrowArray* array, const std::shared_ptr<ListType>& summary_type,
+ const std::vector<PartitionFieldSummary>& summaries);
+
+ virtual Result<int64_t> GetSequenceNumber(const ManifestFile& file) const;
+ virtual Result<int64_t> GetMinSequenceNumber(const ManifestFile& file) const;
+ virtual Result<std::optional<int64_t>> GetFirstRowId(const ManifestFile&
file) const;
+
+ protected:
+ std::shared_ptr<Schema> manifest_list_schema_;
};
} // namespace iceberg
diff --git a/src/iceberg/manifest_entry.cc b/src/iceberg/manifest_entry.cc
index d387aad..5a963b5 100644
--- a/src/iceberg/manifest_entry.cc
+++ b/src/iceberg/manifest_entry.cc
@@ -44,7 +44,8 @@ std::shared_ptr<StructType>
DataFile::Type(std::shared_ptr<StructType> partition
kContent,
kFilePath,
kFileFormat,
- SchemaField::MakeRequired(102, kPartitionField,
std::move(partition_type)),
+ SchemaField::MakeRequired(kPartitionFieldId, kPartitionField,
+ std::move(partition_type)),
kRecordCount,
kFileSize,
kColumnSizes,
@@ -70,9 +71,10 @@ std::shared_ptr<StructType>
ManifestEntry::TypeFromPartitionType(
std::shared_ptr<StructType> ManifestEntry::TypeFromDataFileType(
std::shared_ptr<StructType> datafile_type) {
- return std::make_shared<StructType>(std::vector<SchemaField>{
- kStatus, kSnapshotId, kSequenceNumber, kFileSequenceNumber,
- SchemaField::MakeRequired(2, kDataFileField, std::move(datafile_type))});
+ return std::make_shared<StructType>(
+ std::vector<SchemaField>{kStatus, kSnapshotId, kSequenceNumber,
kFileSequenceNumber,
+ SchemaField::MakeRequired(kDataFileFieldId,
kDataFileField,
+
std::move(datafile_type))});
}
} // namespace iceberg
diff --git a/src/iceberg/manifest_entry.h b/src/iceberg/manifest_entry.h
index 0aa697a..2b9987b 100644
--- a/src/iceberg/manifest_entry.h
+++ b/src/iceberg/manifest_entry.h
@@ -183,6 +183,7 @@ struct ICEBERG_EXPORT DataFile {
100, "file_path", iceberg::string(), "Location URI with FS scheme");
inline static const SchemaField kFileFormat = SchemaField::MakeRequired(
101, "file_format", iceberg::string(), "File format name: avro, orc, or
parquet");
+ inline static const int32_t kPartitionFieldId = 102;
inline static const std::string kPartitionField = "partition";
inline static const SchemaField kRecordCount = SchemaField::MakeRequired(
103, "record_count", iceberg::int64(), "Number of records in the file");
@@ -296,11 +297,12 @@ struct ICEBERG_EXPORT ManifestEntry {
SchemaField::MakeRequired(0, "status", iceberg::int32());
inline static const SchemaField kSnapshotId =
SchemaField::MakeOptional(1, "snapshot_id", iceberg::int64());
+ inline static const int32_t kDataFileFieldId = 2;
+ inline static const std::string kDataFileField = "data_file";
inline static const SchemaField kSequenceNumber =
SchemaField::MakeOptional(3, "sequence_number", iceberg::int64());
inline static const SchemaField kFileSequenceNumber =
SchemaField::MakeOptional(4, "file_sequence_number", iceberg::int64());
- inline static const std::string kDataFileField = "data_file";
bool operator==(const ManifestEntry& other) const;
diff --git a/src/iceberg/manifest_reader_internal.cc
b/src/iceberg/manifest_reader_internal.cc
index feff82f..fece007 100644
--- a/src/iceberg/manifest_reader_internal.cc
+++ b/src/iceberg/manifest_reader_internal.cc
@@ -21,6 +21,7 @@
#include <nanoarrow/nanoarrow.h>
+#include "iceberg/arrow/nanoarrow_status_internal.h"
#include "iceberg/arrow_c_data_guard_internal.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest_entry.h"
@@ -32,11 +33,6 @@
namespace iceberg {
-#define NANOARROW_RETURN_IF_NOT_OK(status, error) \
- if (status != NANOARROW_OK) [[unlikely]] { \
- return InvalidArrowData("Nanoarrow error: {}", error.message); \
- }
-
#define PARSE_PRIMITIVE_FIELD(item, array_view, type)
\
for (int64_t row_idx = 0; row_idx < array_view->length; row_idx++) {
\
if (!ArrowArrayViewIsNull(array_view, row_idx)) {
\
@@ -208,12 +204,12 @@ Result<std::vector<ManifestFile>>
ParseManifestList(ArrowSchema* schema,
ArrowError error;
ArrowArrayView array_view;
auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
- NANOARROW_RETURN_IF_NOT_OK(status, error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
internal::ArrowArrayViewGuard view_guard(&array_view);
status = ArrowArrayViewSetArray(&array_view, array_in, &error);
- NANOARROW_RETURN_IF_NOT_OK(status, error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
status = ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error);
- NANOARROW_RETURN_IF_NOT_OK(status, error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
std::vector<ManifestFile> manifest_files;
manifest_files.resize(array_in->length);
@@ -471,12 +467,12 @@ Result<std::vector<ManifestEntry>>
ParseManifestEntry(ArrowSchema* schema,
ArrowError error;
ArrowArrayView array_view;
auto status = ArrowArrayViewInitFromSchema(&array_view, schema, &error);
- NANOARROW_RETURN_IF_NOT_OK(status, error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
internal::ArrowArrayViewGuard view_guard(&array_view);
status = ArrowArrayViewSetArray(&array_view, array_in, &error);
- NANOARROW_RETURN_IF_NOT_OK(status, error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
status = ArrowArrayViewValidate(&array_view,
NANOARROW_VALIDATION_LEVEL_FULL, &error);
- NANOARROW_RETURN_IF_NOT_OK(status, error);
+ ICEBERG_NANOARROW_RETURN_UNEXPECTED_WITH_ERROR(status, error);
std::vector<ManifestEntry> manifest_entries;
manifest_entries.resize(array_in->length);
diff --git a/src/iceberg/manifest_writer.cc b/src/iceberg/manifest_writer.cc
index 27fd3f7..7bdfee7 100644
--- a/src/iceberg/manifest_writer.cc
+++ b/src/iceberg/manifest_writer.cc
@@ -50,7 +50,7 @@ Status ManifestWriter::Close() {
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
}
- return {};
+ return writer_->Close();
}
Result<std::unique_ptr<Writer>> OpenFileWriter(std::string_view location,
@@ -66,48 +66,47 @@ Result<std::unique_ptr<Writer>>
OpenFileWriter(std::string_view location,
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV1Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
- std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema)
{
- // TODO(xiao.dong) parse v1 schema
- auto manifest_entry_schema =
- ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
- auto fields_span = manifest_entry_schema->fields();
- std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
- auto schema = std::make_shared<Schema>(fields);
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_location, schema,
std::move(file_io)));
- auto adapter = std::make_unique<ManifestEntryAdapterV1>(snapshot_id,
std::move(schema));
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec>
partition_spec) {
+ auto adapter =
+ std::make_unique<ManifestEntryAdapterV1>(snapshot_id,
std::move(partition_spec));
+ ICEBERG_RETURN_UNEXPECTED(adapter->Init());
+ ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
+
+ auto schema = adapter->schema();
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io)));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
- std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> partition_schema)
{
- // TODO(xiao.dong) parse v2 schema
- auto manifest_entry_schema =
- ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
- auto fields_span = manifest_entry_schema->fields();
- std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
- auto schema = std::make_shared<Schema>(fields);
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_location, schema,
std::move(file_io)));
- auto adapter = std::make_unique<ManifestEntryAdapterV2>(snapshot_id,
std::move(schema));
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec>
partition_spec) {
+ auto adapter =
+ std::make_unique<ManifestEntryAdapterV2>(snapshot_id,
std::move(partition_spec));
+ ICEBERG_RETURN_UNEXPECTED(adapter->Init());
+ ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
+
+ auto schema = adapter->schema();
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io)));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
Result<std::unique_ptr<ManifestWriter>> ManifestWriter::MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<Schema> partition_schema) {
- // TODO(xiao.dong) parse v3 schema
- auto manifest_entry_schema =
- ManifestEntry::TypeFromPartitionType(std::move(partition_schema));
- auto fields_span = manifest_entry_schema->fields();
- std::vector<SchemaField> fields(fields_span.begin(), fields_span.end());
- auto schema = std::make_shared<Schema>(fields);
- ICEBERG_ASSIGN_OR_RAISE(auto writer,
- OpenFileWriter(manifest_location, schema,
std::move(file_io)));
+ std::shared_ptr<PartitionSpec> partition_spec) {
auto adapter = std::make_unique<ManifestEntryAdapterV3>(snapshot_id,
first_row_id,
- std::move(schema));
+
std::move(partition_spec));
+ ICEBERG_RETURN_UNEXPECTED(adapter->Init());
+ ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
+
+ auto schema = adapter->schema();
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer,
+ OpenFileWriter(manifest_location, std::move(schema),
std::move(file_io)));
return std::make_unique<ManifestWriter>(std::move(writer),
std::move(adapter));
}
@@ -132,20 +131,20 @@ Status ManifestListWriter::Close() {
ICEBERG_ASSIGN_OR_RAISE(auto array, adapter_->FinishAppending());
ICEBERG_RETURN_UNEXPECTED(writer_->Write(array));
}
- return {};
+ return writer_->Close();
}
Result<std::unique_ptr<ManifestListWriter>> ManifestListWriter::MakeV1Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
- // TODO(xiao.dong) parse v1 schema
- std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
- ManifestFile::Type().fields().end());
- auto schema = std::make_shared<Schema>(fields);
+ auto adapter = std::make_unique<ManifestFileAdapterV1>(snapshot_id,
parent_snapshot_id);
+ ICEBERG_RETURN_UNEXPECTED(adapter->Init());
+ ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
+
+ auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
- auto writer, OpenFileWriter(manifest_list_location, schema,
std::move(file_io)));
- auto adapter = std::make_unique<ManifestFileAdapterV1>(snapshot_id,
parent_snapshot_id,
- std::move(schema));
+ auto writer,
+ OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io)));
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
@@ -153,14 +152,16 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV2Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, std::string_view manifest_list_location,
std::shared_ptr<FileIO> file_io) {
- // TODO(xiao.dong) parse v2 schema
- std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
- ManifestFile::Type().fields().end());
- auto schema = std::make_shared<Schema>(fields);
+ auto adapter = std::make_unique<ManifestFileAdapterV2>(snapshot_id,
parent_snapshot_id,
+ sequence_number);
+ ICEBERG_RETURN_UNEXPECTED(adapter->Init());
+ ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
+
+ auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
- auto writer, OpenFileWriter(manifest_list_location, schema,
std::move(file_io)));
- auto adapter = std::make_unique<ManifestFileAdapterV2>(
- snapshot_id, parent_snapshot_id, sequence_number, std::move(schema));
+ auto writer,
+ OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io)));
+
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
@@ -168,14 +169,15 @@ Result<std::unique_ptr<ManifestListWriter>>
ManifestListWriter::MakeV3Writer(
int64_t snapshot_id, std::optional<int64_t> parent_snapshot_id,
int64_t sequence_number, std::optional<int64_t> first_row_id,
std::string_view manifest_list_location, std::shared_ptr<FileIO> file_io) {
- // TODO(xiao.dong) parse v3 schema
- std::vector<SchemaField> fields(ManifestFile::Type().fields().begin(),
- ManifestFile::Type().fields().end());
- auto schema = std::make_shared<Schema>(fields);
+ auto adapter = std::make_unique<ManifestFileAdapterV3>(snapshot_id,
parent_snapshot_id,
+ sequence_number,
first_row_id);
+ ICEBERG_RETURN_UNEXPECTED(adapter->Init());
+ ICEBERG_RETURN_UNEXPECTED(adapter->StartAppending());
+
+ auto schema = adapter->schema();
ICEBERG_ASSIGN_OR_RAISE(
- auto writer, OpenFileWriter(manifest_list_location, schema,
std::move(file_io)));
- auto adapter = std::make_unique<ManifestFileAdapterV3>(
- snapshot_id, parent_snapshot_id, sequence_number, first_row_id,
std::move(schema));
+ auto writer,
+ OpenFileWriter(manifest_list_location, std::move(schema),
std::move(file_io)));
return std::make_unique<ManifestListWriter>(std::move(writer),
std::move(adapter));
}
diff --git a/src/iceberg/manifest_writer.h b/src/iceberg/manifest_writer.h
index c9ec307..69e1912 100644
--- a/src/iceberg/manifest_writer.h
+++ b/src/iceberg/manifest_writer.h
@@ -39,7 +39,7 @@ class ICEBERG_EXPORT ManifestWriter {
std::unique_ptr<ManifestEntryAdapter> adapter)
: writer_(std::move(writer)), adapter_(std::move(adapter)) {}
- virtual ~ManifestWriter() = default;
+ ~ManifestWriter() = default;
/// \brief Write manifest entry to file.
/// \param entry Manifest entry to write.
@@ -58,30 +58,33 @@ class ICEBERG_EXPORT ManifestWriter {
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
+ /// \param partition_spec Partition spec for the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV1Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
- std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema>
partition_schema);
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec>
partition_spec);
/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
+ /// \param partition_spec Partition spec for the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV2Writer(
std::optional<int64_t> snapshot_id, std::string_view manifest_location,
- std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema>
partition_schema);
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<PartitionSpec>
partition_spec);
/// \brief Creates a writer for a manifest file.
/// \param snapshot_id ID of the snapshot.
/// \param first_row_id First row ID of the snapshot.
/// \param manifest_location Path to the manifest file.
/// \param file_io File IO implementation to use.
+ /// \param partition_spec Partition spec for the manifest.
/// \return A Result containing the writer or an error.
static Result<std::unique_ptr<ManifestWriter>> MakeV3Writer(
std::optional<int64_t> snapshot_id, std::optional<int64_t> first_row_id,
std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<Schema> partition_schema);
+ std::shared_ptr<PartitionSpec> partition_spec);
private:
static constexpr int64_t kBatchSize = 1024;
@@ -96,7 +99,7 @@ class ICEBERG_EXPORT ManifestListWriter {
std::unique_ptr<ManifestFileAdapter> adapter)
: writer_(std::move(writer)), adapter_(std::move(adapter)) {}
- virtual ~ManifestListWriter() = default;
+ ~ManifestListWriter() = default;
/// \brief Write manifest file to manifest list file.
/// \param file Manifest file to write.
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 318c055..df64ae0 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -51,6 +51,7 @@ iceberg_sources = files(
'file_writer.cc',
'inheritable_metadata.cc',
'json_internal.cc',
+ 'manifest_adapter.cc',
'manifest_entry.cc',
'manifest_list.cc',
'manifest_reader.cc',
@@ -83,6 +84,9 @@ iceberg_sources = files(
'util/murmurhash3_internal.cc',
'util/timepoint.cc',
'util/uuid.cc',
+ 'v1_metadata.cc',
+ 'v2_metadata.cc',
+ 'v3_metadata.cc',
)
# CRoaring does not export symbols, so on Windows it must
diff --git a/src/iceberg/parquet/parquet_data_util.cc
b/src/iceberg/parquet/parquet_data_util.cc
index c822daa..b503b94 100644
--- a/src/iceberg/parquet/parquet_data_util.cc
+++ b/src/iceberg/parquet/parquet_data_util.cc
@@ -23,7 +23,7 @@
#include <arrow/record_batch.h>
#include <arrow/type.h>
-#include "iceberg/arrow/arrow_error_transform_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/parquet/parquet_data_util_internal.h"
#include "iceberg/schema.h"
#include "iceberg/schema_util.h"
diff --git a/src/iceberg/parquet/parquet_reader.cc
b/src/iceberg/parquet/parquet_reader.cc
index e57b98e..64373d0 100644
--- a/src/iceberg/parquet/parquet_reader.cc
+++ b/src/iceberg/parquet/parquet_reader.cc
@@ -32,8 +32,8 @@
#include <parquet/file_reader.h>
#include <parquet/properties.h>
-#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/parquet/parquet_data_util_internal.h"
#include "iceberg/parquet/parquet_register.h"
#include "iceberg/parquet/parquet_schema_util_internal.h"
diff --git a/src/iceberg/parquet/parquet_writer.cc
b/src/iceberg/parquet/parquet_writer.cc
index 2d2cd54..61f4671 100644
--- a/src/iceberg/parquet/parquet_writer.cc
+++ b/src/iceberg/parquet/parquet_writer.cc
@@ -29,8 +29,8 @@
#include <parquet/file_writer.h>
#include <parquet/properties.h>
-#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/schema_internal.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/macros.h"
@@ -76,9 +76,9 @@ class ParquetWriter::Impl {
return {};
}
- Status Write(ArrowArray array) {
+ Status Write(ArrowArray* array) {
ICEBERG_ARROW_ASSIGN_OR_RETURN(auto batch,
- ::arrow::ImportRecordBatch(&array,
arrow_schema_));
+ ::arrow::ImportRecordBatch(array,
arrow_schema_));
ICEBERG_ARROW_RETURN_NOT_OK(writer_->WriteRecordBatch(*batch));
@@ -132,7 +132,7 @@ Status ParquetWriter::Open(const WriterOptions& options) {
return impl_->Open(options);
}
-Status ParquetWriter::Write(ArrowArray array) { return impl_->Write(array); }
+Status ParquetWriter::Write(ArrowArray* array) { return impl_->Write(array); }
Status ParquetWriter::Close() { return impl_->Close(); }
diff --git a/src/iceberg/parquet/parquet_writer.h
b/src/iceberg/parquet/parquet_writer.h
index 5371f38..9be0a43 100644
--- a/src/iceberg/parquet/parquet_writer.h
+++ b/src/iceberg/parquet/parquet_writer.h
@@ -35,7 +35,7 @@ class ICEBERG_BUNDLE_EXPORT ParquetWriter : public Writer {
Status Close() final;
- Status Write(ArrowArray array) final;
+ Status Write(ArrowArray* array) final;
std::optional<Metrics> metrics() final;
diff --git a/src/iceberg/partition_spec.cc b/src/iceberg/partition_spec.cc
index 2b41950..3fa5d86 100644
--- a/src/iceberg/partition_spec.cc
+++ b/src/iceberg/partition_spec.cc
@@ -24,7 +24,10 @@
#include <ranges>
#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/transform.h"
#include "iceberg/util/formatter.h" // IWYU pragma: keep
+#include "iceberg/util/macros.h"
namespace iceberg {
@@ -57,6 +60,50 @@ int32_t PartitionSpec::spec_id() const { return spec_id_; }
std::span<const PartitionField> PartitionSpec::fields() const { return
fields_; }
+Result<std::shared_ptr<StructType>> PartitionSpec::PartitionType() {
+ if (fields_.empty()) {
+ return nullptr;
+ }
+ {
+ std::scoped_lock<std::mutex> lock(mutex_);
+ if (partition_type_ != nullptr) {
+ return partition_type_;
+ }
+ }
+
+ std::vector<SchemaField> partition_fields;
+ for (const auto& partition_field : fields_) {
+ // Get the source field from the original schema by source_id
+ ICEBERG_ASSIGN_OR_RAISE(auto source_field,
+
schema_->FindFieldById(partition_field.source_id()));
+ if (!source_field.has_value()) {
+ // TODO(xiao.dong) when source field is missing,
+ // should return an error or just use UNKNOWN type
+ return InvalidSchema("Cannot find source field for partition field:{}",
+ partition_field.field_id());
+ }
+ auto source_field_type = source_field.value().get().type();
+ // Bind the transform to the source field type to get the result type
+ ICEBERG_ASSIGN_OR_RAISE(auto transform_function,
+
partition_field.transform()->Bind(source_field_type));
+
+ auto result_type = transform_function->ResultType();
+
+ // Create the partition field with the transform result type
+ // Partition fields are always optional (can be null)
+ partition_fields.emplace_back(partition_field.field_id(),
+ std::string(partition_field.name()),
+ std::move(result_type),
+ /*optional=*/true);
+ }
+
+ std::scoped_lock<std::mutex> lock(mutex_);
+ if (partition_type_ == nullptr) {
+ partition_type_ =
std::make_shared<StructType>(std::move(partition_fields));
+ }
+ return partition_type_;
+}
+
std::string PartitionSpec::ToString() const {
std::string repr = std::format("partition_spec[spec_id<{}>,\n", spec_id_);
for (const auto& field : fields_) {
diff --git a/src/iceberg/partition_spec.h b/src/iceberg/partition_spec.h
index f105a27..5546922 100644
--- a/src/iceberg/partition_spec.h
+++ b/src/iceberg/partition_spec.h
@@ -23,6 +23,7 @@
/// Partition specs for Iceberg tables.
#include <cstdint>
+#include <mutex>
#include <optional>
#include <span>
#include <string>
@@ -30,6 +31,7 @@
#include "iceberg/iceberg_export.h"
#include "iceberg/partition_field.h"
+#include "iceberg/result.h"
#include "iceberg/util/formattable.h"
namespace iceberg {
@@ -62,11 +64,16 @@ class ICEBERG_EXPORT PartitionSpec : public
util::Formattable {
/// \brief Get the table schema
const std::shared_ptr<Schema>& schema() const;
+
/// \brief Get the spec ID.
int32_t spec_id() const;
- /// \brief Get a view of the partition fields.
+
+ /// \brief Get a list view of the partition fields.
std::span<const PartitionField> fields() const;
+ /// \brief Get the partition type.
+ Result<std::shared_ptr<StructType>> PartitionType();
+
std::string ToString() const override;
int32_t last_assigned_field_id() const { return last_assigned_field_id_; }
@@ -77,12 +84,16 @@ class ICEBERG_EXPORT PartitionSpec : public
util::Formattable {
private:
/// \brief Compare two partition specs for equality.
- [[nodiscard]] bool Equals(const PartitionSpec& other) const;
+ bool Equals(const PartitionSpec& other) const;
std::shared_ptr<Schema> schema_;
const int32_t spec_id_;
std::vector<PartitionField> fields_;
int32_t last_assigned_field_id_;
+
+ // FIXME: use similar lazy initialization pattern as in StructType
+ std::mutex mutex_;
+ std::shared_ptr<StructType> partition_type_;
};
} // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index e687f97..7c62a2a 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -117,8 +117,8 @@ if(ICEBERG_BUILD_BUNDLE)
avro_test.cc
avro_schema_test.cc
avro_stream_test.cc
- manifest_list_reader_test.cc
- manifest_reader_test.cc
+ manifest_list_reader_writer_test.cc
+ manifest_reader_writer_test.cc
test_common.cc)
add_iceberg_test(arrow_test
diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc
index 4bc773c..2bd09f9 100644
--- a/src/iceberg/test/avro_test.cc
+++ b/src/iceberg/test/avro_test.cc
@@ -128,7 +128,7 @@ class AvroReaderTest : public TempFileTestBase {
{.path = temp_avro_file_, .schema = schema, .io = file_io_});
ASSERT_TRUE(writer_result.has_value());
auto writer = std::move(writer_result.value());
- ASSERT_THAT(writer->Write(arrow_array), IsOk());
+ ASSERT_THAT(writer->Write(&arrow_array), IsOk());
ASSERT_THAT(writer->Close(), IsOk());
auto file_info_result = local_fs_->GetFileInfo(temp_avro_file_);
diff --git a/src/iceberg/test/manifest_list_reader_test.cc
b/src/iceberg/test/manifest_list_reader_writer_test.cc
similarity index 78%
rename from src/iceberg/test/manifest_list_reader_test.cc
rename to src/iceberg/test/manifest_list_reader_writer_test.cc
index 9fd6e4c..793dd33 100644
--- a/src/iceberg/test/manifest_list_reader_test.cc
+++ b/src/iceberg/test/manifest_list_reader_writer_test.cc
@@ -18,7 +18,6 @@
*/
#include <arrow/filesystem/localfs.h>
-#include <avro/GenericDatum.hh>
#include <gtest/gtest.h>
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
@@ -26,12 +25,14 @@
#include "iceberg/expression/literal.h"
#include "iceberg/manifest_list.h"
#include "iceberg/manifest_reader.h"
+#include "iceberg/manifest_writer.h"
+#include "matchers.h"
#include "temp_file_test_base.h"
#include "test_common.h"
namespace iceberg {
-class ManifestListReaderTestBase : public TempFileTestBase {
+class ManifestListReaderWriterTestBase : public TempFileTestBase {
protected:
static void SetUpTestSuite() { avro::RegisterAll(); }
@@ -44,6 +45,11 @@ class ManifestListReaderTestBase : public TempFileTestBase {
void TestManifestListReading(const std::string& resource_name,
const std::vector<ManifestFile>&
expected_manifest_list) {
std::string path = GetResourcePath(resource_name);
+ TestManifestListReadingByPath(path, expected_manifest_list);
+ }
+
+ void TestManifestListReadingByPath(
+ const std::string& path, const std::vector<ManifestFile>&
expected_manifest_list) {
auto manifest_reader_result = ManifestListReader::Make(path, file_io_);
ASSERT_EQ(manifest_reader_result.has_value(), true);
@@ -66,7 +72,7 @@ class ManifestListReaderTestBase : public TempFileTestBase {
std::shared_ptr<FileIO> file_io_;
};
-class ManifestListReaderV1Test : public ManifestListReaderTestBase {
+class ManifestListReaderWriterV1Test : public ManifestListReaderWriterTestBase
{
protected:
std::vector<ManifestFile> PreparePartitionedTestData() {
std::vector<std::string> paths = {
@@ -202,9 +208,20 @@ class ManifestListReaderV1Test : public
ManifestListReaderTestBase {
.lower_bound = lower_bounds[3],
.upper_bound = upper_bounds[3]}}}};
}
+
+ void TestWriteManifestList(const std::string& manifest_list_path,
+ const std::vector<ManifestFile>& manifest_files) {
+ auto result = ManifestListWriter::MakeV1Writer(1, 0, manifest_list_path,
file_io_);
+ ASSERT_TRUE(result.has_value()) << result.error().message;
+ auto writer = std::move(result.value());
+ auto status = writer->AddAll(manifest_files);
+ EXPECT_THAT(status, IsOk());
+ status = writer->Close();
+ EXPECT_THAT(status, IsOk());
+ }
};
-class ManifestListReaderV2Test : public ManifestListReaderTestBase {
+class ManifestListReaderWriterV2Test : public ManifestListReaderWriterTestBase
{
protected:
std::vector<ManifestFile> PreparePartitionedTestData() {
std::vector<ManifestFile> manifest_files;
@@ -280,39 +297,71 @@ class ManifestListReaderV2Test : public
ManifestListReaderTestBase {
}
return manifest_files;
}
+
+ void TestWriteManifestList(const std::string& manifest_list_path,
+ const std::vector<ManifestFile>& manifest_files) {
+ auto result = ManifestListWriter::MakeV2Writer(1, 0, 4,
manifest_list_path, file_io_);
+ ASSERT_TRUE(result.has_value()) << result.error().message;
+ auto writer = std::move(result.value());
+ auto status = writer->AddAll(manifest_files);
+ EXPECT_THAT(status, IsOk());
+ status = writer->Close();
+ EXPECT_THAT(status, IsOk());
+ }
};
// V1 Tests
-TEST_F(ManifestListReaderV1Test, PartitionedTest) {
+TEST_F(ManifestListReaderWriterV1Test, PartitionedTest) {
auto expected_manifest_list = PreparePartitionedTestData();
TestManifestListReading(
"snap-7532614258660258098-1-eafd2972-f58e-4185-9237-6378f564787e.avro",
expected_manifest_list);
}
-TEST_F(ManifestListReaderV1Test, ComplexTypeTest) {
+TEST_F(ManifestListReaderWriterV1Test, ComplexTypeTest) {
auto expected_manifest_list = PrepareComplexTypeTestData();
TestManifestListReading(
"snap-4134160420377642835-1-aeffe099-3bac-4011-bc17-5875210d8dc0.avro",
expected_manifest_list);
}
-TEST_F(ManifestListReaderV1Test, ComplexPartitionedTest) {
+TEST_F(ManifestListReaderWriterV1Test, ComplexPartitionedTest) {
auto expected_manifest_list = PrepareComplexPartitionedTestData();
TestManifestListReading(
"snap-7522296285847100621-1-5d690750-8fb4-4cd1-8ae7-85c7b39abe14.avro",
expected_manifest_list);
}
+TEST_F(ManifestListReaderWriterV1Test, WritePartitionedTest) {
+ auto expected_manifest_list = PreparePartitionedTestData();
+ auto write_manifest_list_path = CreateNewTempFilePath();
+ TestWriteManifestList(write_manifest_list_path, expected_manifest_list);
+ TestManifestListReadingByPath(write_manifest_list_path,
expected_manifest_list);
+}
+
+TEST_F(ManifestListReaderWriterV1Test, WriteComplexTypeTest) {
+ auto expected_manifest_list = PrepareComplexTypeTestData();
+ auto write_manifest_list_path = CreateNewTempFilePath();
+ TestWriteManifestList(write_manifest_list_path, expected_manifest_list);
+ TestManifestListReadingByPath(write_manifest_list_path,
expected_manifest_list);
+}
+
+TEST_F(ManifestListReaderWriterV1Test, WriteComplexPartitionedTest) {
+ auto expected_manifest_list = PrepareComplexPartitionedTestData();
+ auto write_manifest_list_path = CreateNewTempFilePath();
+ TestWriteManifestList(write_manifest_list_path, expected_manifest_list);
+ TestManifestListReadingByPath(write_manifest_list_path,
expected_manifest_list);
+}
+
// V2 Tests
-TEST_F(ManifestListReaderV2Test, PartitionedTest) {
+TEST_F(ManifestListReaderWriterV2Test, PartitionedTest) {
auto expected_manifest_list = PreparePartitionedTestData();
TestManifestListReading(
"snap-7412193043800610213-1-2bccd69e-d642-4816-bba0-261cd9bd0d93.avro",
expected_manifest_list);
}
-TEST_F(ManifestListReaderV2Test, NonPartitionedTest) {
+TEST_F(ManifestListReaderWriterV2Test, NonPartitionedTest) {
auto expected_manifest_list = PrepareNonPartitionedTestData();
TestManifestListReading(
"snap-251167482216575399-1-ccb6dbcb-0611-48da-be68-bd506ea63188.avro",
@@ -322,4 +371,21 @@ TEST_F(ManifestListReaderV2Test, NonPartitionedTest) {
TestNonPartitionedManifests(expected_manifest_list);
}
+TEST_F(ManifestListReaderWriterV2Test, WritePartitionedTest) {
+ auto expected_manifest_list = PreparePartitionedTestData();
+ auto write_manifest_list_path = CreateNewTempFilePath();
+ TestWriteManifestList(write_manifest_list_path, expected_manifest_list);
+ TestManifestListReadingByPath(write_manifest_list_path,
expected_manifest_list);
+}
+
+TEST_F(ManifestListReaderWriterV2Test, WriteNonPartitionedTest) {
+ auto expected_manifest_list = PrepareNonPartitionedTestData();
+ auto write_manifest_list_path = CreateNewTempFilePath();
+ TestWriteManifestList(write_manifest_list_path, expected_manifest_list);
+ TestManifestListReadingByPath(write_manifest_list_path,
expected_manifest_list);
+
+ // Additional verification: ensure all manifests are truly non-partitioned
+ TestNonPartitionedManifests(expected_manifest_list);
+}
+
} // namespace iceberg
diff --git a/src/iceberg/test/manifest_reader_test.cc
b/src/iceberg/test/manifest_reader_writer_test.cc
similarity index 74%
rename from src/iceberg/test/manifest_reader_test.cc
rename to src/iceberg/test/manifest_reader_writer_test.cc
index 7381b29..435b9bc 100644
--- a/src/iceberg/test/manifest_reader_test.cc
+++ b/src/iceberg/test/manifest_reader_writer_test.cc
@@ -17,8 +17,6 @@
* under the License.
*/
-#include "iceberg/manifest_reader.h"
-
#include <cstddef>
#include <arrow/filesystem/localfs.h>
@@ -28,7 +26,11 @@
#include "iceberg/avro/avro_register.h"
#include "iceberg/manifest_entry.h"
#include "iceberg/manifest_list.h"
+#include "iceberg/manifest_reader.h"
+#include "iceberg/manifest_writer.h"
#include "iceberg/schema.h"
+#include "iceberg/transform.h"
+#include "matchers.h"
#include "temp_file_test_base.h"
#include "test_common.h"
@@ -48,6 +50,12 @@ class ManifestReaderTestBase : public TempFileTestBase {
const std::vector<ManifestEntry>& expected_entries,
std::shared_ptr<Schema> partition_schema = nullptr)
{
std::string path = GetResourcePath(resource_name);
+ TestManifestReadingByPath(path, expected_entries, partition_schema);
+ }
+
+ void TestManifestReadingByPath(const std::string& path,
+ const std::vector<ManifestEntry>&
expected_entries,
+ std::shared_ptr<Schema> partition_schema =
nullptr) {
auto manifest_reader_result = ManifestReader::Make(path, file_io_,
partition_schema);
ASSERT_TRUE(manifest_reader_result.has_value())
<< manifest_reader_result.error().message;
@@ -142,9 +150,23 @@ class ManifestReaderV1Test : public ManifestReaderTestBase
{
}
return manifest_entries;
}
+
+ void TestWriteManifest(const std::string& manifest_list_path,
+ std::shared_ptr<PartitionSpec> partition_spec,
+ const std::vector<ManifestEntry>& manifest_entries) {
+ auto result = ManifestWriter::MakeV1Writer(1, manifest_list_path, file_io_,
+ std::move(partition_spec));
+ ASSERT_TRUE(result.has_value()) << result.error().message;
+ auto writer = std::move(result.value());
+ auto status = writer->AddAll(manifest_entries);
+ EXPECT_THAT(status, IsOk());
+ status = writer->Close();
+ EXPECT_THAT(status, IsOk());
+ }
};
TEST_F(ManifestReaderV1Test, PartitionedTest) {
+ // TODO(xiao.dong) we need to add more cases for different partition types
iceberg::SchemaField partition_field(1000, "order_ts_hour",
iceberg::int32(), true);
auto partition_schema =
std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
@@ -153,6 +175,23 @@ TEST_F(ManifestReaderV1Test, PartitionedTest) {
partition_schema);
}
+TEST_F(ManifestReaderV1Test, WritePartitionedTest) {
+ iceberg::SchemaField table_field(1, "order_ts_hour_source",
iceberg::int32(), true);
+ iceberg::SchemaField partition_field(1000, "order_ts_hour",
iceberg::int32(), true);
+ auto table_schema =
std::make_shared<Schema>(std::vector<SchemaField>({table_field}));
+ auto partition_schema =
+ std::make_shared<Schema>(std::vector<SchemaField>({partition_field}));
+ auto identity_transform = Transform::Identity();
+ std::vector<PartitionField> fields{
+ PartitionField(1, 1000, "order_ts_hour", identity_transform)};
+ auto partition_spec = std::make_shared<PartitionSpec>(table_schema, 1,
fields);
+
+ auto expected_entries = PreparePartitionedTestData();
+ auto write_manifest_path = CreateNewTempFilePath();
+ TestWriteManifest(write_manifest_path, partition_spec, expected_entries);
+ TestManifestReadingByPath(write_manifest_path, expected_entries,
partition_schema);
+}
+
class ManifestReaderV2Test : public ManifestReaderTestBase {
protected:
std::vector<ManifestEntry> CreateV2TestData(
@@ -218,6 +257,19 @@ class ManifestReaderV2Test : public ManifestReaderTestBase
{
std::vector<ManifestEntry> PrepareMetadataInheritanceTestData() {
return CreateV2TestData(/*sequence_number=*/15, /*partition_spec_id*/ 12);
}
+
+ void TestWriteManifest(int64_t snapshot_id, const std::string&
manifest_list_path,
+ std::shared_ptr<PartitionSpec> partition_spec,
+ const std::vector<ManifestEntry>& manifest_entries) {
+ auto result = ManifestWriter::MakeV2Writer(snapshot_id,
manifest_list_path, file_io_,
+ std::move(partition_spec));
+ ASSERT_TRUE(result.has_value()) << result.error().message;
+ auto writer = std::move(result.value());
+ auto status = writer->AddAll(manifest_entries);
+ EXPECT_THAT(status, IsOk());
+ status = writer->Close();
+ EXPECT_THAT(status, IsOk());
+ }
};
TEST_F(ManifestReaderV2Test, NonPartitionedTest) {
@@ -239,4 +291,26 @@ TEST_F(ManifestReaderV2Test, MetadataInheritanceTest) {
TestManifestReadingWithManifestFile(manifest_file, expected_entries);
}
+TEST_F(ManifestReaderV2Test, WriteNonPartitionedTest) {
+ auto expected_entries = PrepareNonPartitionedTestData();
+ auto write_manifest_path = CreateNewTempFilePath();
+ TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr,
expected_entries);
+ TestManifestReadingByPath(write_manifest_path, expected_entries);
+}
+
+TEST_F(ManifestReaderV2Test, WriteInheritancePartitionedTest) {
+ auto expected_entries = PrepareMetadataInheritanceTestData();
+ auto write_manifest_path = CreateNewTempFilePath();
+ TestWriteManifest(679879563479918846LL, write_manifest_path, nullptr,
expected_entries);
+ ManifestFile manifest_file{
+ .manifest_path = write_manifest_path,
+ .manifest_length = 100,
+ .partition_spec_id = 12,
+ .content = ManifestFile::Content::kData,
+ .sequence_number = 15,
+ .added_snapshot_id = 679879563479918846LL,
+ };
+ TestManifestReadingWithManifestFile(manifest_file, expected_entries);
+}
+
} // namespace iceberg
diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc
index 0c42b84..c6df838 100644
--- a/src/iceberg/test/parquet_test.cc
+++ b/src/iceberg/test/parquet_test.cc
@@ -30,8 +30,8 @@
#include <parquet/arrow/writer.h>
#include <parquet/metadata.h>
-#include "iceberg/arrow/arrow_error_transform_internal.h"
#include "iceberg/arrow/arrow_fs_file_io_internal.h"
+#include "iceberg/arrow/arrow_status_internal.h"
#include "iceberg/file_reader.h"
#include "iceberg/file_writer.h"
#include "iceberg/parquet/parquet_register.h"
@@ -51,7 +51,7 @@ namespace {
Status WriteArray(std::shared_ptr<::arrow::Array> data, Writer& writer) {
ArrowArray arr;
ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*data, &arr));
- ICEBERG_RETURN_UNEXPECTED(writer.Write(arr));
+ ICEBERG_RETURN_UNEXPECTED(writer.Write(&arr));
return writer.Close();
}
diff --git a/src/iceberg/test/partition_spec_test.cc
b/src/iceberg/test/partition_spec_test.cc
index 871bb08..538d89a 100644
--- a/src/iceberg/test/partition_spec_test.cc
+++ b/src/iceberg/test/partition_spec_test.cc
@@ -87,4 +87,23 @@ TEST(PartitionSpecTest, Equality) {
ASSERT_NE(schema1, schema6);
ASSERT_NE(schema6, schema1);
}
+
+TEST(PartitionSpecTest, PartitionSchemaTest) {
+ SchemaField field1(5, "ts", iceberg::timestamp(), true);
+ SchemaField field2(7, "bar", iceberg::string(), true);
+ auto const schema =
+ std::make_shared<Schema>(std::vector<SchemaField>{field1, field2}, 100);
+ auto identity_transform = Transform::Identity();
+ PartitionField pt_field1(5, 1000, "day", identity_transform);
+ PartitionField pt_field2(7, 1001, "hour", identity_transform);
+ PartitionSpec spec(schema, 100, {pt_field1, pt_field2});
+
+ auto partition_schema = spec.PartitionType();
+ ASSERT_TRUE(partition_schema.has_value());
+ ASSERT_EQ(2, partition_schema.value()->fields().size());
+ EXPECT_EQ(pt_field1.name(), partition_schema.value()->fields()[0].name());
+ EXPECT_EQ(pt_field1.field_id(),
partition_schema.value()->fields()[0].field_id());
+ EXPECT_EQ(pt_field2.name(), partition_schema.value()->fields()[1].name());
+ EXPECT_EQ(pt_field2.field_id(),
partition_schema.value()->fields()[1].field_id());
+}
} // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 41061d3..bdc5c1e 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -128,6 +128,7 @@ struct DataFile;
struct ManifestEntry;
struct ManifestFile;
struct ManifestList;
+struct PartitionFieldSummary;
class ManifestListReader;
class ManifestListWriter;
diff --git a/src/iceberg/v1_metadata.cc b/src/iceberg/v1_metadata.cc
new file mode 100644
index 0000000..ba381f8
--- /dev/null
+++ b/src/iceberg/v1_metadata.cc
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/v1_metadata.h"
+
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Status ManifestEntryAdapterV1::Init() {
+ static std::unordered_set<int32_t> kManifestEntryFieldIds{
+ ManifestEntry::kStatus.field_id(),
+ ManifestEntry::kSnapshotId.field_id(),
+ ManifestEntry::kDataFileFieldId,
+ DataFile::kFilePath.field_id(),
+ DataFile::kFileFormat.field_id(),
+ DataFile::kPartitionFieldId,
+ DataFile::kRecordCount.field_id(),
+ DataFile::kFileSize.field_id(),
+ 105, // kBlockSizeInBytes field id
+ DataFile::kColumnSizes.field_id(),
+ DataFile::kValueCounts.field_id(),
+ DataFile::kNullValueCounts.field_id(),
+ DataFile::kNanValueCounts.field_id(),
+ DataFile::kLowerBounds.field_id(),
+ DataFile::kUpperBounds.field_id(),
+ DataFile::kKeyMetadata.field_id(),
+ DataFile::kSplitOffsets.field_id(),
+ DataFile::kSortOrderId.field_id(),
+ };
+ // TODO(xiao.dong) schema to json
+ metadata_["schema"] = "{}";
+ // TODO(xiao.dong) partition spec to json
+ metadata_["partition-spec"] = "{}";
+ if (partition_spec_ != nullptr) {
+ metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
+ }
+ metadata_["format-version"] = "1";
+ return InitSchema(kManifestEntryFieldIds);
+}
+
+Status ManifestEntryAdapterV1::Append(const ManifestEntry& entry) {
+ return AppendInternal(entry);
+}
+
+Result<std::shared_ptr<StructType>>
ManifestEntryAdapterV1::GetManifestEntryType() {
+ // 'block_size_in_bytes' (ID 105) is a deprecated field that is REQUIRED
+ // in the v1 data_file schema for backward compatibility.
+ // Deprecated. Always write a default in v1. Do not write in v2 or v3.
+ static const SchemaField kBlockSizeInBytes = SchemaField::MakeRequired(
+ 105, "block_size_in_bytes", int64(), "Block size in bytes");
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_type,
partition_spec_->PartitionType());
+ if (!partition_type) {
+ partition_type = PartitionSpec::Unpartitioned()->schema();
+ }
+ auto datafile_type = std::make_shared<StructType>(std::vector<SchemaField>{
+ DataFile::kFilePath, DataFile::kFileFormat,
+ SchemaField::MakeRequired(102, DataFile::kPartitionField,
+ std::move(partition_type)),
+ DataFile::kRecordCount, DataFile::kFileSize, kBlockSizeInBytes,
+ DataFile::kColumnSizes, DataFile::kValueCounts,
DataFile::kNullValueCounts,
+ DataFile::kNanValueCounts, DataFile::kLowerBounds,
DataFile::kUpperBounds,
+ DataFile::kKeyMetadata, DataFile::kSplitOffsets,
DataFile::kSortOrderId});
+
+ return std::make_shared<StructType>(
+ std::vector<SchemaField>{ManifestEntry::kStatus,
ManifestEntry::kSnapshotId,
+ SchemaField::MakeRequired(2,
ManifestEntry::kDataFileField,
+
std::move(datafile_type))});
+}
+
+Status ManifestFileAdapterV1::Init() {
+ static std::unordered_set<int32_t> kManifestFileFieldIds{
+ ManifestFile::kManifestPath.field_id(),
+ ManifestFile::kManifestLength.field_id(),
+ ManifestFile::kPartitionSpecId.field_id(),
+ ManifestFile::kAddedSnapshotId.field_id(),
+ ManifestFile::kAddedFilesCount.field_id(),
+ ManifestFile::kExistingFilesCount.field_id(),
+ ManifestFile::kDeletedFilesCount.field_id(),
+ ManifestFile::kAddedRowsCount.field_id(),
+ ManifestFile::kExistingRowsCount.field_id(),
+ ManifestFile::kDeletedRowsCount.field_id(),
+ ManifestFile::kPartitions.field_id(),
+ ManifestFile::kKeyMetadata.field_id(),
+ };
+ metadata_["snapshot-id"] = std::to_string(snapshot_id_);
+ metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value()
+ ?
std::to_string(parent_snapshot_id_.value())
+ : "null";
+ metadata_["format-version"] = "1";
+ return InitSchema(kManifestFileFieldIds);
+}
+
+Status ManifestFileAdapterV1::Append(const ManifestFile& file) {
+ if (file.content != ManifestFile::Content::kData) {
+ return InvalidManifestList("Cannot store delete manifests in a v1 table");
+ }
+ return AppendInternal(file);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/v1_metadata.h b/src/iceberg/v1_metadata.h
index 7e91da7..3c095a9 100644
--- a/src/iceberg/v1_metadata.h
+++ b/src/iceberg/v1_metadata.h
@@ -20,9 +20,6 @@
#pragma once
/// \file iceberg/v1_metadata.h
-
-#include <memory>
-
#include "iceberg/manifest_adapter.h"
namespace iceberg {
@@ -31,32 +28,29 @@ namespace iceberg {
class ManifestEntryAdapterV1 : public ManifestEntryAdapter {
public:
ManifestEntryAdapterV1(std::optional<int64_t> snapshot_id,
- std::shared_ptr<Schema> schema) {
- // TODO(xiao.dong): init v1 schema
- }
- Status StartAppending() override { return {}; }
- Status Append(const ManifestEntry& entry) override { return {}; }
- Result<ArrowArray> FinishAppending() override { return {}; }
+ std::shared_ptr<PartitionSpec> partition_spec)
+ : ManifestEntryAdapter(std::move(partition_spec)),
snapshot_id_(snapshot_id) {}
+ Status Init() override;
+ Status Append(const ManifestEntry& entry) override;
+
+ protected:
+ Result<std::shared_ptr<StructType>> GetManifestEntryType() override;
private:
- std::shared_ptr<Schema> manifest_schema_;
- ArrowSchema schema_; // converted from manifest_schema_
+ std::optional<int64_t> snapshot_id_;
};
/// \brief Adapter to convert V1 ManifestFile to `ArrowArray`.
class ManifestFileAdapterV1 : public ManifestFileAdapter {
public:
- ManifestFileAdapterV1(int64_t snapshot_id, std::optional<int64_t>
parent_snapshot_id,
- std::shared_ptr<Schema> schema) {
- // TODO(xiao.dong): init v1 schema
- }
- Status StartAppending() override { return {}; }
- Status Append(const ManifestFile& file) override { return {}; }
- Result<ArrowArray> FinishAppending() override { return {}; }
+ ManifestFileAdapterV1(int64_t snapshot_id, std::optional<int64_t>
parent_snapshot_id)
+ : snapshot_id_(snapshot_id), parent_snapshot_id_(parent_snapshot_id) {}
+ Status Init() override;
+ Status Append(const ManifestFile& file) override;
private:
- std::shared_ptr<Schema> manifest_list_schema_;
- ArrowSchema schema_; // converted from manifest_list_schema_
+ int64_t snapshot_id_;
+ std::optional<int64_t> parent_snapshot_id_;
};
} // namespace iceberg
diff --git a/src/iceberg/v2_metadata.cc b/src/iceberg/v2_metadata.cc
new file mode 100644
index 0000000..e2bbb91
--- /dev/null
+++ b/src/iceberg/v2_metadata.cc
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/v2_metadata.h"
+
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+
+namespace iceberg {
+
+Status ManifestEntryAdapterV2::Init() {
+ static std::unordered_set<int32_t> kManifestEntryFieldIds{
+ ManifestEntry::kStatus.field_id(),
+ ManifestEntry::kSnapshotId.field_id(),
+ ManifestEntry::kSequenceNumber.field_id(),
+ ManifestEntry::kFileSequenceNumber.field_id(),
+ ManifestEntry::kDataFileFieldId,
+ DataFile::kContent.field_id(),
+ DataFile::kFilePath.field_id(),
+ DataFile::kFileFormat.field_id(),
+ DataFile::kPartitionFieldId,
+ DataFile::kRecordCount.field_id(),
+ DataFile::kFileSize.field_id(),
+ DataFile::kColumnSizes.field_id(),
+ DataFile::kValueCounts.field_id(),
+ DataFile::kNullValueCounts.field_id(),
+ DataFile::kNanValueCounts.field_id(),
+ DataFile::kLowerBounds.field_id(),
+ DataFile::kUpperBounds.field_id(),
+ DataFile::kKeyMetadata.field_id(),
+ DataFile::kSplitOffsets.field_id(),
+ DataFile::kEqualityIds.field_id(),
+ DataFile::kSortOrderId.field_id(),
+ DataFile::kReferencedDataFile.field_id(),
+ };
+ // TODO(xiao.dong) schema to json
+ metadata_["schema"] = "{}";
+ // TODO(xiao.dong) partition spec to json
+ metadata_["partition-spec"] = "{}";
+ if (partition_spec_ != nullptr) {
+ metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
+ }
+ metadata_["format-version"] = "2";
+ metadata_["content"] = "data";
+ return InitSchema(kManifestEntryFieldIds);
+}
+
+Status ManifestEntryAdapterV2::Append(const ManifestEntry& entry) {
+ return AppendInternal(entry);
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapterV2::GetSequenceNumber(
+ const ManifestEntry& entry) const {
+ if (!entry.sequence_number.has_value()) {
+ // if the entry's data sequence number is null,
+ // then it will inherit the sequence number of the current commit.
+ // to validate that this is correct, check that the snapshot id is either
null (will
+ // also be inherited) or that it matches the id of the current commit.
+ if (entry.snapshot_id.has_value() && entry.snapshot_id.value() !=
snapshot_id_) {
+ return InvalidManifest(
+ "Found unassigned sequence number for an entry from snapshot: {}",
+ entry.snapshot_id.value());
+ }
+
+ // inheritance should work only for ADDED entries
+ if (entry.status != ManifestStatus::kAdded) {
+ return InvalidManifest(
+ "Only entries with status ADDED can have null sequence number");
+ }
+
+ return std::nullopt;
+ }
+ return entry.sequence_number;
+}
+
+Result<std::optional<std::string>>
ManifestEntryAdapterV2::GetReferenceDataFile(
+ const DataFile& file) const {
+ if (file.content == DataFile::Content::kPositionDeletes) {
+ return file.referenced_data_file;
+ }
+ return std::nullopt;
+}
+
+Status ManifestFileAdapterV2::Init() {
+ static std::unordered_set<int32_t> kManifestFileFieldIds{
+ ManifestFile::kManifestPath.field_id(),
+ ManifestFile::kManifestLength.field_id(),
+ ManifestFile::kPartitionSpecId.field_id(),
+ ManifestFile::kContent.field_id(),
+ ManifestFile::kSequenceNumber.field_id(),
+ ManifestFile::kMinSequenceNumber.field_id(),
+ ManifestFile::kAddedSnapshotId.field_id(),
+ ManifestFile::kAddedFilesCount.field_id(),
+ ManifestFile::kExistingFilesCount.field_id(),
+ ManifestFile::kDeletedFilesCount.field_id(),
+ ManifestFile::kAddedRowsCount.field_id(),
+ ManifestFile::kExistingRowsCount.field_id(),
+ ManifestFile::kDeletedRowsCount.field_id(),
+ ManifestFile::kPartitions.field_id(),
+ ManifestFile::kKeyMetadata.field_id(),
+ };
+ metadata_["snapshot-id"] = std::to_string(snapshot_id_);
+ metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value()
+ ?
std::to_string(parent_snapshot_id_.value())
+ : "null";
+ metadata_["sequence-number"] = std::to_string(sequence_number_);
+ metadata_["format-version"] = "2";
+ return InitSchema(kManifestFileFieldIds);
+}
+
+Status ManifestFileAdapterV2::Append(const ManifestFile& file) {
+ return AppendInternal(file);
+}
+
+Result<int64_t> ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile&
file) const {
+ if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ if (snapshot_id_ != file.added_snapshot_id) {
+ return InvalidManifestList(
+ "Found unassigned sequence number for a manifest from snapshot: %s",
+ file.added_snapshot_id);
+ }
+ return sequence_number_;
+ }
+ return file.sequence_number;
+}
+
+Result<int64_t> ManifestFileAdapterV2::GetMinSequenceNumber(
+ const ManifestFile& file) const {
+ if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ if (snapshot_id_ != file.added_snapshot_id) {
+ return InvalidManifestList(
+ "Found unassigned sequence number for a manifest from snapshot: %s",
+ file.added_snapshot_id);
+ }
+ return sequence_number_;
+ }
+ return file.min_sequence_number;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/v2_metadata.h b/src/iceberg/v2_metadata.h
index d6ff6aa..164a497 100644
--- a/src/iceberg/v2_metadata.h
+++ b/src/iceberg/v2_metadata.h
@@ -21,8 +21,6 @@
/// \file iceberg/v2_metadata.h
-#include <memory>
-
#include "iceberg/manifest_adapter.h"
namespace iceberg {
@@ -31,32 +29,40 @@ namespace iceberg {
class ManifestEntryAdapterV2 : public ManifestEntryAdapter {
public:
ManifestEntryAdapterV2(std::optional<int64_t> snapshot_id,
- std::shared_ptr<Schema> schema) {
- // TODO(xiao.dong): init v2 schema
- }
- Status StartAppending() override { return {}; }
- Status Append(const ManifestEntry& entry) override { return {}; }
- Result<ArrowArray> FinishAppending() override { return {}; }
+ std::shared_ptr<PartitionSpec> partition_spec)
+ : ManifestEntryAdapter(std::move(partition_spec)),
snapshot_id_(snapshot_id) {}
+ Status Init() override;
+ Status Append(const ManifestEntry& entry) override;
+
+ protected:
+ Result<std::optional<int64_t>> GetSequenceNumber(
+ const ManifestEntry& entry) const override;
+ Result<std::optional<std::string>> GetReferenceDataFile(
+ const DataFile& file) const override;
private:
- std::shared_ptr<Schema> manifest_schema_;
- ArrowSchema schema_; // converted from manifest_schema_
+ std::optional<int64_t> snapshot_id_;
};
/// \brief Adapter to convert V2 ManifestFile to `ArrowArray`.
class ManifestFileAdapterV2 : public ManifestFileAdapter {
public:
ManifestFileAdapterV2(int64_t snapshot_id, std::optional<int64_t>
parent_snapshot_id,
- int64_t sequence_number, std::shared_ptr<Schema>
schema) {
- // TODO(xiao.dong): init v2 schema
- }
- Status StartAppending() override { return {}; }
- Status Append(const ManifestFile& file) override { return {}; }
- Result<ArrowArray> FinishAppending() override { return {}; }
+ int64_t sequence_number)
+ : snapshot_id_(snapshot_id),
+ parent_snapshot_id_(parent_snapshot_id),
+ sequence_number_(sequence_number) {}
+ Status Init() override;
+ Status Append(const ManifestFile& file) override;
+
+ protected:
+ Result<int64_t> GetSequenceNumber(const ManifestFile& file) const override;
+ Result<int64_t> GetMinSequenceNumber(const ManifestFile& file) const
override;
private:
- std::shared_ptr<Schema> manifest_list_schema_;
- ArrowSchema schema_; // converted from manifest_list_schema_
+ int64_t snapshot_id_;
+ std::optional<int64_t> parent_snapshot_id_;
+ int64_t sequence_number_;
};
} // namespace iceberg
diff --git a/src/iceberg/v3_metadata.cc b/src/iceberg/v3_metadata.cc
new file mode 100644
index 0000000..e460598
--- /dev/null
+++ b/src/iceberg/v3_metadata.cc
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/v3_metadata.h"
+
+#include "iceberg/manifest_entry.h"
+#include "iceberg/manifest_list.h"
+#include "iceberg/schema.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Status ManifestEntryAdapterV3::Init() {
+ static std::unordered_set<int32_t> kManifestEntryFieldIds{
+ ManifestEntry::kStatus.field_id(),
+ ManifestEntry::kSnapshotId.field_id(),
+ ManifestEntry::kDataFileFieldId,
+ ManifestEntry::kSequenceNumber.field_id(),
+ ManifestEntry::kFileSequenceNumber.field_id(),
+ DataFile::kContent.field_id(),
+ DataFile::kFilePath.field_id(),
+ DataFile::kFileFormat.field_id(),
+ DataFile::kPartitionFieldId,
+ DataFile::kRecordCount.field_id(),
+ DataFile::kFileSize.field_id(),
+ DataFile::kColumnSizes.field_id(),
+ DataFile::kValueCounts.field_id(),
+ DataFile::kNullValueCounts.field_id(),
+ DataFile::kNanValueCounts.field_id(),
+ DataFile::kLowerBounds.field_id(),
+ DataFile::kUpperBounds.field_id(),
+ DataFile::kKeyMetadata.field_id(),
+ DataFile::kSplitOffsets.field_id(),
+ DataFile::kEqualityIds.field_id(),
+ DataFile::kSortOrderId.field_id(),
+ DataFile::kFirstRowId.field_id(),
+ DataFile::kReferencedDataFile.field_id(),
+ DataFile::kContentOffset.field_id(),
+ DataFile::kContentSize.field_id(),
+ };
+ // TODO(xiao.dong) schema to json
+ metadata_["schema"] = "{}";
+ // TODO(xiao.dong) partition spec to json
+ metadata_["partition-spec"] = "{}";
+ if (partition_spec_ != nullptr) {
+ metadata_["partition-spec-id"] =
std::to_string(partition_spec_->spec_id());
+ }
+ metadata_["format-version"] = "3";
+ metadata_["content"] = "data";
+ return InitSchema(kManifestEntryFieldIds);
+}
+
+Status ManifestEntryAdapterV3::Append(const ManifestEntry& entry) {
+ return AppendInternal(entry);
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapterV3::GetSequenceNumber(
+ const ManifestEntry& entry) const {
+ if (!entry.sequence_number.has_value()) {
+ // if the entry's data sequence number is null,
+ // then it will inherit the sequence number of the current commit.
+ // to validate that this is correct, check that the snapshot id is either
null (will
+ // also be inherited) or that it matches the id of the current commit.
+ if (entry.snapshot_id.has_value() && entry.snapshot_id.value() !=
snapshot_id_) {
+ return InvalidManifest(
+ "Found unassigned sequence number for an entry from snapshot: {}",
+ entry.snapshot_id.value());
+ }
+
+ // inheritance should work only for ADDED entries
+ if (entry.status != ManifestStatus::kAdded) {
+ return InvalidManifest(
+ "Only entries with status ADDED can have null sequence number");
+ }
+
+ return std::nullopt;
+ }
+ return entry.sequence_number;
+}
+
+Result<std::optional<std::string>>
ManifestEntryAdapterV3::GetReferenceDataFile(
+ const DataFile& file) const {
+ if (file.content == DataFile::Content::kPositionDeletes) {
+ return file.referenced_data_file;
+ }
+ return std::nullopt;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapterV3::GetFirstRowId(
+ const DataFile& file) const {
+ if (file.content == DataFile::Content::kData) {
+ return file.first_row_id;
+ }
+ return std::nullopt;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapterV3::GetContentOffset(
+ const DataFile& file) const {
+ if (file.content == DataFile::Content::kPositionDeletes) {
+ return file.content_offset;
+ }
+ return std::nullopt;
+}
+
+Result<std::optional<int64_t>> ManifestEntryAdapterV3::GetContentSizeInBytes(
+ const DataFile& file) const {
+ if (file.content == DataFile::Content::kPositionDeletes) {
+ return file.content_size_in_bytes;
+ }
+ return std::nullopt;
+}
+
+Status ManifestFileAdapterV3::Init() {
+ static std::unordered_set<int32_t> kManifestFileFieldIds{
+ ManifestFile::kManifestPath.field_id(),
+ ManifestFile::kManifestLength.field_id(),
+ ManifestFile::kPartitionSpecId.field_id(),
+ ManifestFile::kContent.field_id(),
+ ManifestFile::kSequenceNumber.field_id(),
+ ManifestFile::kMinSequenceNumber.field_id(),
+ ManifestFile::kAddedSnapshotId.field_id(),
+ ManifestFile::kAddedFilesCount.field_id(),
+ ManifestFile::kExistingFilesCount.field_id(),
+ ManifestFile::kDeletedFilesCount.field_id(),
+ ManifestFile::kAddedRowsCount.field_id(),
+ ManifestFile::kExistingRowsCount.field_id(),
+ ManifestFile::kDeletedRowsCount.field_id(),
+ ManifestFile::kPartitions.field_id(),
+ ManifestFile::kKeyMetadata.field_id(),
+ ManifestFile::kFirstRowId.field_id(),
+ };
+ metadata_["snapshot-id"] = std::to_string(snapshot_id_);
+ metadata_["parent-snapshot-id"] = parent_snapshot_id_.has_value()
+ ?
std::to_string(parent_snapshot_id_.value())
+ : "null";
+ metadata_["sequence-number"] = std::to_string(sequence_number_);
+ metadata_["first-row-id"] =
+ next_row_id_.has_value() ? std::to_string(next_row_id_.value()) : "null";
+ metadata_["format-version"] = "3";
+ return InitSchema(kManifestFileFieldIds);
+}
+
+Status ManifestFileAdapterV3::Append(const ManifestFile& file) {
+ auto status = AppendInternal(file);
+ ICEBERG_RETURN_UNEXPECTED(status);
+ if (WrappedFirstRowId(file) && next_row_id_.has_value()) {
+ next_row_id_ = next_row_id_.value() + file.existing_rows_count.value_or(0)
+
+ file.added_rows_count.value_or(0);
+ }
+ return status;
+}
+
+Result<int64_t> ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile&
file) const {
+ if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ if (snapshot_id_ != file.added_snapshot_id) {
+ return InvalidManifestList(
+ "Found unassigned sequence number for a manifest from snapshot: %s",
+ file.added_snapshot_id);
+ }
+ return sequence_number_;
+ }
+ return file.sequence_number;
+}
+
+Result<int64_t> ManifestFileAdapterV3::GetMinSequenceNumber(
+ const ManifestFile& file) const {
+ if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
+ if (snapshot_id_ != file.added_snapshot_id) {
+ return InvalidManifestList(
+ "Found unassigned sequence number for a manifest from snapshot: %s",
+ file.added_snapshot_id);
+ }
+ return sequence_number_;
+ }
+ return file.min_sequence_number;
+}
+
+Result<std::optional<int64_t>> ManifestFileAdapterV3::GetFirstRowId(
+ const ManifestFile& file) const {
+ if (WrappedFirstRowId(file)) {
+ return next_row_id_;
+ } else if (file.content != ManifestFile::Content::kData) {
+ return std::nullopt;
+ } else {
+ if (!file.first_row_id.has_value()) {
+ return InvalidManifestList("Found unassigned first-row-id for file:{}",
+ file.manifest_path);
+ }
+ return file.first_row_id.value();
+ }
+}
+
+bool ManifestFileAdapterV3::WrappedFirstRowId(const ManifestFile& file) const {
+ return file.content == ManifestFile::Content::kData &&
!file.first_row_id.has_value();
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/v3_metadata.h b/src/iceberg/v3_metadata.h
index e7bcc35..a107610 100644
--- a/src/iceberg/v3_metadata.h
+++ b/src/iceberg/v3_metadata.h
@@ -21,44 +21,62 @@
/// \file iceberg/v3_metadata.h
-#include <memory>
-
#include "iceberg/manifest_adapter.h"
namespace iceberg {
-/// \brief Adapter to convert V3ManifestEntry to `ArrowArray`.
+/// \brief Adapter to convert V3 ManifestEntry to `ArrowArray`.
class ManifestEntryAdapterV3 : public ManifestEntryAdapter {
public:
ManifestEntryAdapterV3(std::optional<int64_t> snapshot_id,
std::optional<int64_t> first_row_id,
- std::shared_ptr<Schema> schema) {
- // TODO(xiao.dong): init v3 schema
- }
- Status StartAppending() override { return {}; }
- Status Append(const ManifestEntry& entry) override { return {}; }
- Result<ArrowArray> FinishAppending() override { return {}; }
+ std::shared_ptr<PartitionSpec> partition_spec)
+ : ManifestEntryAdapter(std::move(partition_spec)),
+ snapshot_id_(snapshot_id),
+ first_row_id_(first_row_id) {}
+ Status Init() override;
+ Status Append(const ManifestEntry& entry) override;
+
+ protected:
+ Result<std::optional<int64_t>> GetSequenceNumber(
+ const ManifestEntry& entry) const override;
+ Result<std::optional<std::string>> GetReferenceDataFile(
+ const DataFile& file) const override;
+ Result<std::optional<int64_t>> GetFirstRowId(const DataFile& file) const
override;
+ Result<std::optional<int64_t>> GetContentOffset(const DataFile& file) const
override;
+ Result<std::optional<int64_t>> GetContentSizeInBytes(
+ const DataFile& file) const override;
private:
- std::shared_ptr<Schema> manifest_schema_;
- ArrowSchema schema_; // converted from manifest_schema_
+ std::optional<int64_t> snapshot_id_;
+ std::optional<int64_t> first_row_id_;
};
/// \brief Adapter to convert V3 ManifestFile to `ArrowArray`.
class ManifestFileAdapterV3 : public ManifestFileAdapter {
public:
ManifestFileAdapterV3(int64_t snapshot_id, std::optional<int64_t>
parent_snapshot_id,
- int64_t sequence_number, std::optional<int64_t>
first_row_id,
- std::shared_ptr<Schema> schema) {
- // TODO(xiao.dong): init v3 schema
- }
- Status StartAppending() override { return {}; }
- Status Append(const ManifestFile& file) override { return {}; }
- Result<ArrowArray> FinishAppending() override { return {}; }
+ int64_t sequence_number, std::optional<int64_t>
first_row_id)
+ : snapshot_id_(snapshot_id),
+ parent_snapshot_id_(parent_snapshot_id),
+ sequence_number_(sequence_number),
+ next_row_id_(first_row_id) {}
+ Status Init() override;
+ Status Append(const ManifestFile& file) override;
+
+ protected:
+ Result<int64_t> GetSequenceNumber(const ManifestFile& file) const override;
+ Result<int64_t> GetMinSequenceNumber(const ManifestFile& file) const
override;
+ Result<std::optional<int64_t>> GetFirstRowId(const ManifestFile& file) const
override;
+
+ private:
+ bool WrappedFirstRowId(const ManifestFile& file) const;
private:
- std::shared_ptr<Schema> manifest_list_schema_;
- ArrowSchema schema_; // converted from manifest_list_schema_
+ int64_t snapshot_id_;
+ std::optional<int64_t> parent_snapshot_id_;
+ int64_t sequence_number_;
+ std::optional<int64_t> next_row_id_;
};
} // namespace iceberg