This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new a457099e feat: add FastAppend (#516)
a457099e is described below
commit a457099e60cdd763b758d71c4c6a77e122dcb181
Author: Junwang Zhao <[email protected]>
AuthorDate: Tue Jan 20 17:46:39 2026 +0800
feat: add FastAppend (#516)
---
src/iceberg/CMakeLists.txt | 2 +
src/iceberg/constants.h | 1 +
src/iceberg/inheritable_metadata.cc | 8 +-
src/iceberg/inheritable_metadata.h | 8 +-
src/iceberg/manifest/manifest_reader.cc | 23 ++-
src/iceberg/manifest/manifest_reader.h | 12 +-
src/iceberg/manifest/manifest_util.cc | 71 +++++++
src/iceberg/manifest/manifest_util_internal.h | 56 ++++++
src/iceberg/meson.build | 2 +
src/iceberg/table.cc | 7 +
src/iceberg/table.h | 3 +
src/iceberg/table_metadata.h | 1 -
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/fast_append_test.cc | 189 +++++++++++++++++++
src/iceberg/test/manifest_writer_versions_test.cc | 30 ++-
src/iceberg/test/update_test_base.h | 17 +-
src/iceberg/transaction.cc | 9 +-
src/iceberg/transaction.h | 3 +
src/iceberg/type_fwd.h | 4 +-
src/iceberg/update/fast_append.cc | 215 ++++++++++++++++++++++
src/iceberg/update/fast_append.h | 104 +++++++++++
src/iceberg/update/meson.build | 1 +
src/iceberg/update/snapshot_update.h | 29 ++-
src/iceberg/util/content_file_util.h | 67 +++++++
24 files changed, 823 insertions(+), 40 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 35c312f6..9ff802a1 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -49,6 +49,7 @@ set(ICEBERG_SOURCES
manifest/manifest_group.cc
manifest/manifest_list.cc
manifest/manifest_reader.cc
+ manifest/manifest_util.cc
manifest/manifest_writer.cc
manifest/rolling_manifest_writer.cc
manifest/v1_metadata.cc
@@ -85,6 +86,7 @@ set(ICEBERG_SOURCES
transform_function.cc
type.cc
update/expire_snapshots.cc
+ update/fast_append.cc
update/pending_update.cc
update/snapshot_update.cc
update/update_location.cc
diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h
index 89001f09..1d594162 100644
--- a/src/iceberg/constants.h
+++ b/src/iceberg/constants.h
@@ -32,6 +32,7 @@ namespace iceberg {
constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
constexpr int64_t kInvalidSnapshotId = -1;
+constexpr int64_t kInvalidSequenceNumber = -1;
/// \brief Stand-in for the current sequence number that will be assigned when
the commit
/// is successful. This is replaced when writing a manifest list by the
ManifestFile
/// adapter.
diff --git a/src/iceberg/inheritable_metadata.cc
b/src/iceberg/inheritable_metadata.cc
index 1d740b5c..7ff2ddbc 100644
--- a/src/iceberg/inheritable_metadata.cc
+++ b/src/iceberg/inheritable_metadata.cc
@@ -21,14 +21,16 @@
#include <utility>
-#include <iceberg/result.h>
-
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
-#include "iceberg/snapshot.h"
namespace iceberg {
+InheritableMetadata::~InheritableMetadata() = default;
+BaseInheritableMetadata::~BaseInheritableMetadata() = default;
+CopyInheritableMetadata::~CopyInheritableMetadata() = default;
+EmptyInheritableMetadata::~EmptyInheritableMetadata() = default;
+
BaseInheritableMetadata::BaseInheritableMetadata(int32_t spec_id, int64_t
snapshot_id,
int64_t sequence_number,
std::string manifest_location)
diff --git a/src/iceberg/inheritable_metadata.h
b/src/iceberg/inheritable_metadata.h
index f06693a4..8b5ddadc 100644
--- a/src/iceberg/inheritable_metadata.h
+++ b/src/iceberg/inheritable_metadata.h
@@ -39,7 +39,7 @@ namespace iceberg {
/// from the manifest file. This interface provides a way to apply such
inheritance rules.
class ICEBERG_EXPORT InheritableMetadata {
public:
- virtual ~InheritableMetadata() = default;
+ virtual ~InheritableMetadata();
/// \brief Apply inheritable metadata to a manifest entry.
/// \param entry The manifest entry to modify.
@@ -61,6 +61,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public
InheritableMetadata {
Status Apply(ManifestEntry& entry) override;
+ ~BaseInheritableMetadata() override;
+
private:
int32_t spec_id_;
int64_t snapshot_id_;
@@ -72,6 +74,8 @@ class ICEBERG_EXPORT BaseInheritableMetadata : public
InheritableMetadata {
class ICEBERG_EXPORT EmptyInheritableMetadata : public InheritableMetadata {
public:
Status Apply(ManifestEntry& entry) override;
+
+ ~EmptyInheritableMetadata() override;
};
/// \brief Metadata inheritance for copying manifests before commit.
@@ -83,6 +87,8 @@ class ICEBERG_EXPORT CopyInheritableMetadata : public
InheritableMetadata {
Status Apply(ManifestEntry& entry) override;
+ ~CopyInheritableMetadata() override;
+
private:
int64_t snapshot_id_;
};
diff --git a/src/iceberg/manifest/manifest_reader.cc
b/src/iceberg/manifest/manifest_reader.cc
index 693b9fc5..53100b23 100644
--- a/src/iceberg/manifest/manifest_reader.cc
+++ b/src/iceberg/manifest/manifest_reader.cc
@@ -32,6 +32,7 @@
#include "iceberg/expression/expression.h"
#include "iceberg/expression/projections.h"
#include "iceberg/file_format.h"
+#include "iceberg/inheritable_metadata.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader_internal.h"
@@ -998,18 +999,22 @@ Result<std::unique_ptr<ManifestReader>>
ManifestReader::Make(
}
Result<std::unique_ptr<ManifestReader>> ManifestReader::Make(
- std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec) {
- if (file_io == nullptr || schema == nullptr || spec == nullptr) {
- return InvalidArgument(
- "FileIO, Schema, and PartitionSpec cannot be null to create
ManifestReader");
+ std::string_view manifest_location, std::optional<int64_t> manifest_length,
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
+ std::shared_ptr<PartitionSpec> spec,
+ std::unique_ptr<InheritableMetadata> inheritable_metadata,
+ std::optional<int64_t> first_row_id) {
+ ICEBERG_PRECHECK(file_io != nullptr, "FileIO cannot be null to read
manifest");
+ ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null to read
manifest");
+ ICEBERG_PRECHECK(spec != nullptr, "PartitionSpec cannot be null to read
manifest");
+
+ if (inheritable_metadata == nullptr) {
+ ICEBERG_ASSIGN_OR_RAISE(inheritable_metadata,
InheritableMetadataFactory::Empty());
}
- // No metadata to inherit in this case.
- ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
InheritableMetadataFactory::Empty());
return std::make_unique<ManifestReaderImpl>(
- std::string(manifest_location), std::nullopt, std::move(file_io),
std::move(schema),
- std::move(spec), std::move(inheritable_metadata), std::nullopt);
+ std::string(manifest_location), manifest_length, std::move(file_io),
+ std::move(schema), std::move(spec), std::move(inheritable_metadata),
first_row_id);
}
Result<std::unique_ptr<ManifestListReader>> ManifestListReader::Make(
diff --git a/src/iceberg/manifest/manifest_reader.h
b/src/iceberg/manifest/manifest_reader.h
index ddfefc57..1a142021 100644
--- a/src/iceberg/manifest/manifest_reader.h
+++ b/src/iceberg/manifest/manifest_reader.h
@@ -22,7 +22,9 @@
/// \file iceberg/manifest/manifest_reader.h
/// Data reader interface for manifest files.
+#include <cstdint>
#include <memory>
+#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
@@ -92,13 +94,19 @@ class ICEBERG_EXPORT ManifestReader {
/// \brief Creates a reader for a manifest file.
/// \param manifest_location Path to the manifest file.
+ /// \param manifest_length Length of the manifest file.
/// \param file_io File IO implementation to use.
/// \param schema Schema used to bind the partition type.
/// \param spec Partition spec used for this manifest file.
+ /// \param inheritable_metadata Inheritable metadata.
+ /// \param first_row_id First row ID to use for the manifest entries.
/// \return A Result containing the reader or an error.
static Result<std::unique_ptr<ManifestReader>> Make(
- std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
- std::shared_ptr<Schema> schema, std::shared_ptr<PartitionSpec> spec);
+ std::string_view manifest_location, std::optional<int64_t>
manifest_length,
+ std::shared_ptr<FileIO> file_io, std::shared_ptr<Schema> schema,
+ std::shared_ptr<PartitionSpec> spec,
+ std::unique_ptr<InheritableMetadata> inheritable_metadata,
+ std::optional<int64_t> first_row_id = std::nullopt);
/// \brief Add stats columns to the column list if needed.
static std::vector<std::string> WithStatsColumns(
diff --git a/src/iceberg/manifest/manifest_util.cc
b/src/iceberg/manifest/manifest_util.cc
new file mode 100644
index 00000000..12805452
--- /dev/null
+++ b/src/iceberg/manifest/manifest_util.cc
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+#include <optional>
+
+#include "iceberg/inheritable_metadata.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_util_internal.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/result.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<ManifestFile> CopyAppendManifest(
+ const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
+ const std::shared_ptr<Schema>& schema, const
std::shared_ptr<PartitionSpec>& spec,
+ int64_t snapshot_id, const std::string& output_path, int8_t format_version,
+ SnapshotSummaryBuilder* summary_builder) {
+ // use metadata that will add the current snapshot's ID for the rewrite
+ // read first_row_id as null because this copies the incoming manifest
before commit
+ ICEBERG_ASSIGN_OR_RAISE(auto inheritable_metadata,
+ InheritableMetadataFactory::ForCopy(snapshot_id));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto reader,
+ ManifestReader::Make(manifest.manifest_path, manifest.manifest_length,
file_io,
+ schema, spec, std::move(inheritable_metadata),
+ /*first_row_id=*/std::nullopt));
+ ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+
+ // do not produce row IDs for the copy
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto writer, ManifestWriter::MakeWriter(
+ format_version, snapshot_id, output_path, file_io,
spec, schema,
+ ManifestContent::kData, /*first_row_id=*/std::nullopt));
+
+ for (auto& entry : entries) {
+ ICEBERG_CHECK(entry.status == ManifestStatus::kAdded,
+ "Manifest to copy must only contain added entries");
+ if (summary_builder != nullptr && entry.data_file != nullptr) {
+ ICEBERG_RETURN_UNEXPECTED(summary_builder->AddedFile(*spec,
*entry.data_file));
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ return writer->ToManifestFile();
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_util_internal.h
b/src/iceberg/manifest/manifest_util_internal.h
new file mode 100644
index 00000000..e68437d0
--- /dev/null
+++ b/src/iceberg/manifest/manifest_util_internal.h
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/manifest_util_internal.h
+/// Internal utility functions for manifest operations.
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Copy an append manifest with a new snapshot ID.
+///
+/// This function copies a manifest file that contains only ADDED entries,
+/// rewriting it with a new snapshot ID. This is similar to Java's
+/// ManifestFiles.copyAppendManifest.
+///
+/// \param manifest The manifest file to copy
+/// \param file_io File IO implementation to use
+/// \param schema Table schema
+/// \param spec Partition spec for the manifest
+/// \param snapshot_id The new snapshot ID to assign to entries
+/// \param output_path Path where the new manifest will be written
+/// \param format_version Table format version
+/// \param summary_builder Optional summary builder to update with file metrics
+/// \return The copied manifest file, or an error
+ICEBERG_EXPORT Result<ManifestFile> CopyAppendManifest(
+ const ManifestFile& manifest, const std::shared_ptr<FileIO>& file_io,
+ const std::shared_ptr<Schema>& schema, const
std::shared_ptr<PartitionSpec>& spec,
+ int64_t snapshot_id, const std::string& output_path, int8_t format_version,
+ SnapshotSummaryBuilder* summary_builder = nullptr);
+
+} // namespace iceberg
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 317b4fa9..febe39c9 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -67,6 +67,7 @@ iceberg_sources = files(
'manifest/manifest_group.cc',
'manifest/manifest_list.cc',
'manifest/manifest_reader.cc',
+ 'manifest/manifest_util.cc',
'manifest/manifest_writer.cc',
'manifest/rolling_manifest_writer.cc',
'manifest/v1_metadata.cc',
@@ -103,6 +104,7 @@ iceberg_sources = files(
'transform_function.cc',
'type.cc',
'update/expire_snapshots.cc',
+ 'update/fast_append.cc',
'update/pending_update.cc',
'update/snapshot_update.cc',
'update/update_location.cc',
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 5c406deb..28ee285a 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -199,6 +199,13 @@ Result<std::shared_ptr<UpdateLocation>>
Table::NewUpdateLocation() {
return transaction->NewUpdateLocation();
}
+Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
+ /*auto_commit=*/true));
+ return transaction->NewFastAppend();
+}
+
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index fd346e15..75cad6e1 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -156,6 +156,9 @@ class ICEBERG_EXPORT Table : public
std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
+ /// \brief Create a new FastAppend to append data files and commit the
changes.
+ virtual Result<std::shared_ptr<FastAppend>> NewFastAppend();
+
protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index a4165b81..6631b9d2 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -75,7 +75,6 @@ struct ICEBERG_EXPORT TableMetadata {
static constexpr int8_t kMinFormatVersionRowLineage = 3;
static constexpr int8_t kMinFormatVersionDefaultValues = 3;
static constexpr int64_t kInitialSequenceNumber = 0;
- static constexpr int64_t kInvalidSequenceNumber = -1;
static constexpr int64_t kInitialRowId = 0;
static inline const std::unordered_map<TypeId, int8_t> kMinFormatVersions =
{};
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index d243a48b..3414a862 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
expire_snapshots_test.cc
+ fast_append_test.cc
transaction_test.cc
update_location_test.cc
update_partition_spec_test.cc
diff --git a/src/iceberg/test/fast_append_test.cc
b/src/iceberg/test/fast_append_test.cc
new file mode 100644
index 00000000..7c79d5e9
--- /dev/null
+++ b/src/iceberg/test/fast_append_test.cc
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/fast_append.h"
+
+#include <format>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/test_resource.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/util/uuid.h"
+
+namespace iceberg {
+
+class FastAppendTest : public UpdateTestBase {
+ protected:
+ static void SetUpTestSuite() { avro::RegisterAll(); }
+
+ void SetUp() override {
+ InitializeFileIO();
+ // Use minimal metadata for FastAppend tests
+ RegisterTableFromResource("TableMetadataV2ValidMinimal.json");
+
+ // Get partition spec and schema from the base table
+ ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
+ ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());
+
+ // Create test data files
+ file_a_ =
+ CreateDataFile("/data/file_a.parquet", /*size=*/100,
/*partition_value=*/1024);
+ file_b_ =
+ CreateDataFile("/data/file_b.parquet", /*size=*/200,
/*partition_value=*/2048);
+ }
+
+ std::shared_ptr<DataFile> CreateDataFile(const std::string& path, int64_t
record_count,
+ int64_t size, int64_t
partition_value = 0) {
+ auto data_file = std::make_shared<DataFile>();
+ data_file->content = DataFile::Content::kData;
+ data_file->file_path = table_location_ + path;
+ data_file->file_format = FileFormatType::kParquet;
+ // The base table has partition spec with identity(x), so we need 1
partition value
+ data_file->partition =
+ PartitionValues(std::vector<Literal>{Literal::Long(partition_value)});
+ data_file->file_size_in_bytes = size;
+ data_file->record_count = record_count;
+ data_file->partition_spec_id = spec_->spec_id();
+ return data_file;
+ }
+
+ std::shared_ptr<PartitionSpec> spec_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<DataFile> file_a_;
+ std::shared_ptr<DataFile> file_b_;
+};
+
+TEST_F(FastAppendTest, AppendDataFile) {
+ std::shared_ptr<FastAppend> fast_append;
+ ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
+ fast_append->AppendFile(file_a_);
+
+ EXPECT_THAT(fast_append->Commit(), IsOk());
+
+ EXPECT_THAT(table_->Refresh(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+ EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
+ EXPECT_EQ(snapshot->summary.at("added-records"), "100");
+ EXPECT_EQ(snapshot->summary.at("added-files-size"), "1024");
+}
+
+TEST_F(FastAppendTest, AppendMultipleDataFiles) {
+ std::shared_ptr<FastAppend> fast_append;
+ ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
+ fast_append->AppendFile(file_a_);
+ fast_append->AppendFile(file_b_);
+
+ EXPECT_THAT(fast_append->Commit(), IsOk());
+
+ EXPECT_THAT(table_->Refresh(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+ EXPECT_EQ(snapshot->summary.at("added-data-files"), "2");
+ EXPECT_EQ(snapshot->summary.at("added-records"), "300");
+ EXPECT_EQ(snapshot->summary.at("added-files-size"), "3072");
+}
+
+TEST_F(FastAppendTest, AppendManyFiles) {
+ std::shared_ptr<FastAppend> fast_append;
+ ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
+
+ int64_t total_records = 0;
+ int64_t total_size = 0;
+ constexpr int kFileCount = 10;
+ for (int index = 0; index < kFileCount; ++index) {
+ auto data_file = CreateDataFile(std::format("/data/file_{}.parquet",
index),
+ /*record_count=*/10 + index,
+ /*size=*/100 + index * 10,
+ /*partition_value=*/index % 2);
+ total_records += data_file->record_count;
+ total_size += data_file->file_size_in_bytes;
+ fast_append->AppendFile(std::move(data_file));
+ }
+
+ EXPECT_THAT(fast_append->Commit(), IsOk());
+
+ EXPECT_THAT(table_->Refresh(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+ EXPECT_EQ(snapshot->summary.at("added-data-files"),
std::to_string(kFileCount));
+ EXPECT_EQ(snapshot->summary.at("added-records"),
std::to_string(total_records));
+ EXPECT_EQ(snapshot->summary.at("added-files-size"),
std::to_string(total_size));
+}
+
+TEST_F(FastAppendTest, EmptyTableAppendUpdatesSequenceNumbers) {
+ EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current
snapshot"));
+ const int64_t base_sequence_number =
table_->metadata()->last_sequence_number;
+
+ std::shared_ptr<FastAppend> fast_append;
+ ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
+ fast_append->AppendFile(file_a_);
+
+ EXPECT_THAT(fast_append->Commit(), IsOk());
+
+ EXPECT_THAT(table_->Refresh(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+ EXPECT_EQ(snapshot->sequence_number, base_sequence_number + 1);
+ EXPECT_EQ(table_->metadata()->last_sequence_number, base_sequence_number +
1);
+}
+
+TEST_F(FastAppendTest, AppendNullFile) {
+ std::shared_ptr<FastAppend> fast_append;
+ ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
+ fast_append->AppendFile(nullptr);
+
+ auto result = fast_append->Commit();
+ EXPECT_FALSE(result.has_value());
+ EXPECT_THAT(result, HasErrorMessage("Invalid data file: null"));
+ EXPECT_THAT(table_->current_snapshot(), HasErrorMessage("No current
snapshot"));
+}
+
+TEST_F(FastAppendTest, AppendDuplicateFile) {
+ std::shared_ptr<FastAppend> fast_append;
+ ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
+ fast_append->AppendFile(file_a_);
+ fast_append->AppendFile(file_a_); // Add same file twice
+
+ EXPECT_THAT(fast_append->Commit(), IsOk());
+
+ EXPECT_THAT(table_->Refresh(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+ // Should only count the file once
+ EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");
+ EXPECT_EQ(snapshot->summary.at("added-records"), "100");
+}
+
+TEST_F(FastAppendTest, SetSnapshotProperty) {
+ std::shared_ptr<FastAppend> fast_append;
+ ICEBERG_UNWRAP_OR_FAIL(fast_append, table_->NewFastAppend());
+ fast_append->Set("custom-property", "custom-value");
+ fast_append->AppendFile(file_a_);
+
+ EXPECT_THAT(fast_append->Commit(), IsOk());
+
+ EXPECT_THAT(table_->Refresh(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+ EXPECT_EQ(snapshot->summary.at("custom-property"), "custom-value");
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/manifest_writer_versions_test.cc
b/src/iceberg/test/manifest_writer_versions_test.cc
index 70d00504..cc4e804b 100644
--- a/src/iceberg/test/manifest_writer_versions_test.cc
+++ b/src/iceberg/test/manifest_writer_versions_test.cc
@@ -27,6 +27,7 @@
#include "iceberg/arrow/arrow_file_io.h"
#include "iceberg/avro/avro_register.h"
+#include "iceberg/constants.h"
#include "iceberg/file_format.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
@@ -411,12 +412,11 @@ class ManifestWriterVersionsTest : public ::testing::Test
{
TEST_F(ManifestWriterVersionsTest, TestV1Write) {
auto manifest = WriteManifest(/*format_version=*/1, {data_file_});
- CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
- TableMetadata::kInvalidSequenceNumber);
+ CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber);
auto entries = ReadManifest(manifest);
ASSERT_EQ(entries.size(), 1);
- CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
- TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData);
+ CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber,
+ DataFile::Content::kData);
}
TEST_F(ManifestWriterVersionsTest, TestV1WriteDelete) {
@@ -449,13 +449,12 @@ TEST_F(ManifestWriterVersionsTest,
TestV1WriteWithInheritance) {
TEST_F(ManifestWriterVersionsTest, TestV2Write) {
auto manifest = WriteManifest(/*format_version=*/2, {data_file_});
- CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
- TableMetadata::kInvalidSequenceNumber);
+ CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber);
auto entries = ReadManifest(manifest);
ASSERT_EQ(entries.size(), 1);
ASSERT_EQ(manifest.content, ManifestContent::kData);
- CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
- TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData);
+ CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber,
+ DataFile::Content::kData);
}
TEST_F(ManifestWriterVersionsTest, TestV2WriteWithInheritance) {
@@ -470,8 +469,7 @@ TEST_F(ManifestWriterVersionsTest,
TestV2WriteWithInheritance) {
TEST_F(ManifestWriterVersionsTest, TestV2PlusWriteDeleteV2) {
auto manifest = WriteDeleteManifest(/*format_version=*/2, delete_file_);
- CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
- TableMetadata::kInvalidSequenceNumber);
+ CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber);
auto entries = ReadManifest(manifest);
ASSERT_EQ(entries.size(), 1);
ASSERT_EQ(manifest.content, ManifestContent::kDeletes);
@@ -507,7 +505,7 @@ TEST_F(ManifestWriterVersionsTest,
TestV2ManifestRewriteWithInheritance) {
// rewrite the manifest file using a v2 manifest
auto rewritten_manifest = RewriteManifest(manifests[0], 2);
- CheckRewrittenManifest(rewritten_manifest,
TableMetadata::kInvalidSequenceNumber,
+ CheckRewrittenManifest(rewritten_manifest, kInvalidSequenceNumber,
TableMetadata::kInitialSequenceNumber);
// add the v2 manifest to a v2 manifest list, with a sequence number
@@ -525,14 +523,12 @@ TEST_F(ManifestWriterVersionsTest,
TestV2ManifestRewriteWithInheritance) {
TEST_F(ManifestWriterVersionsTest, TestV3Write) {
auto manifest = WriteManifest(/*format_version=*/3, {data_file_});
- CheckManifest(manifest, TableMetadata::kInvalidSequenceNumber,
- TableMetadata::kInvalidSequenceNumber);
+ CheckManifest(manifest, kInvalidSequenceNumber, kInvalidSequenceNumber);
auto entries = ReadManifest(manifest);
ASSERT_EQ(entries.size(), 1);
ASSERT_EQ(manifest.content, ManifestContent::kData);
- CheckEntry(entries[0], TableMetadata::kInvalidSequenceNumber,
- TableMetadata::kInvalidSequenceNumber, DataFile::Content::kData,
- ManifestStatus::kAdded, kFirstRowId);
+ CheckEntry(entries[0], kInvalidSequenceNumber, kInvalidSequenceNumber,
+ DataFile::Content::kData, ManifestStatus::kAdded, kFirstRowId);
}
TEST_F(ManifestWriterVersionsTest, TestV3WriteWithInheritance) {
@@ -598,7 +594,7 @@ TEST_F(ManifestWriterVersionsTest,
TestV3ManifestRewriteWithInheritance) {
// rewrite the manifest file using a v3 manifest
auto rewritten_manifest = RewriteManifest(manifests[0], 3);
- CheckRewrittenManifest(rewritten_manifest,
TableMetadata::kInvalidSequenceNumber,
+ CheckRewrittenManifest(rewritten_manifest, kInvalidSequenceNumber,
TableMetadata::kInitialSequenceNumber);
// add the v3 manifest to a v3 manifest list, with a sequence number
diff --git a/src/iceberg/test/update_test_base.h
b/src/iceberg/test/update_test_base.h
index c78dc4d0..c14cb76b 100644
--- a/src/iceberg/test/update_test_base.h
+++ b/src/iceberg/test/update_test_base.h
@@ -41,6 +41,12 @@ namespace iceberg {
class UpdateTestBase : public ::testing::Test {
protected:
void SetUp() override {
+ InitializeFileIO();
+ RegisterTableFromResource("TableMetadataV2Valid.json");
+ }
+
+ /// \brief Initialize file IO and create necessary directories.
+ void InitializeFileIO() {
file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
catalog_ =
InMemoryCatalog::Make("test_catalog", file_io_, "/warehouse/",
/*properties=*/{});
@@ -50,12 +56,19 @@ class UpdateTestBase : public ::testing::Test {
static_cast<arrow::ArrowFileSystemFileIO&>(*file_io_).fs());
ASSERT_TRUE(arrow_fs != nullptr);
ASSERT_TRUE(arrow_fs->CreateDir(table_location_ + "/metadata").ok());
+ }
+
+ /// \brief Register a table from a metadata resource file.
+ ///
+ /// \param resource_name The name of the metadata resource file
+ void RegisterTableFromResource(const std::string& resource_name) {
+ // Drop existing table if it exists
+ std::ignore = catalog_->DropTable(table_ident_, /*purge=*/false);
// Write table metadata to the table location.
auto metadata_location = std::format("{}/metadata/00001-{}.metadata.json",
table_location_,
Uuid::GenerateV7().ToString());
- ICEBERG_UNWRAP_OR_FAIL(auto metadata,
-
ReadTableMetadataFromResource("TableMetadataV2Valid.json"));
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata,
ReadTableMetadataFromResource(resource_name));
metadata->location = table_location_;
ASSERT_THAT(TableMetadataUtil::Write(*file_io_, metadata_location,
*metadata),
IsOk());
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 10a87e65..d10586a4 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -23,7 +23,6 @@
#include <optional>
#include "iceberg/catalog.h"
-#include "iceberg/schema.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
@@ -31,6 +30,7 @@
#include "iceberg/table_requirements.h"
#include "iceberg/table_update.h"
#include "iceberg/update/expire_snapshots.h"
+#include "iceberg/update/fast_append.h"
#include "iceberg/update/pending_update.h"
#include "iceberg/update/snapshot_update.h"
#include "iceberg/update/update_location.h"
@@ -293,4 +293,11 @@ Result<std::shared_ptr<UpdateLocation>>
Transaction::NewUpdateLocation() {
return update_location;
}
+Result<std::shared_ptr<FastAppend>> Transaction::NewFastAppend() {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<FastAppend> fast_append,
+ FastAppend::Make(table_->name().name,
shared_from_this()));
+ ICEBERG_RETURN_UNEXPECTED(AddUpdate(fast_append));
+ return fast_append;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 7133a3b5..0f567312 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -86,6 +86,9 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
+ /// \brief Create a new FastAppend to append data files and commit the
changes.
+ Result<std::shared_ptr<FastAppend>> NewFastAppend();
+
private:
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
std::unique_ptr<TableMetadataBuilder> metadata_builder);
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 251334c1..5bf03a00 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -120,6 +120,8 @@ struct SnapshotLogEntry;
struct SnapshotRef;
struct StatisticsFile;
struct TableMetadata;
+class InheritableMetadata;
+class SnapshotSummaryBuilder;
/// \brief Expression.
class BoundPredicate;
@@ -188,6 +190,7 @@ class Transaction;
/// \brief Update family.
class ExpireSnapshots;
+class FastAppend;
class PendingUpdate;
class SnapshotUpdate;
class UpdateLocation;
@@ -200,7 +203,6 @@ class UpdateSortOrder;
/// TODO: Forward declarations below are not added yet.
///
----------------------------------------------------------------------------
-class AppendFiles;
class EncryptedKey;
} // namespace iceberg
diff --git a/src/iceberg/update/fast_append.cc
b/src/iceberg/update/fast_append.cc
new file mode 100644
index 00000000..c7f66f2f
--- /dev/null
+++ b/src/iceberg/update/fast_append.cc
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/fast_append.h"
+
+#include <iterator>
+#include <ranges>
+#include <vector>
+
+#include "iceberg/constants.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_util_internal.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/error_collector.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::unique_ptr<FastAppend>> FastAppend::Make(
+ std::string table_name, std::shared_ptr<Transaction> transaction) {
+ ICEBERG_PRECHECK(!table_name.empty(), "Table name cannot be empty");
+ ICEBERG_PRECHECK(transaction != nullptr,
+ "Cannot create FastAppend without a transaction");
+ return std::unique_ptr<FastAppend>(
+ new FastAppend(std::move(table_name), std::move(transaction)));
+}
+
+FastAppend::FastAppend(std::string table_name, std::shared_ptr<Transaction>
transaction)
+ : SnapshotUpdate(std::move(transaction)),
table_name_(std::move(table_name)) {}
+
+FastAppend& FastAppend::AppendFile(const std::shared_ptr<DataFile>& file) {
+ ICEBERG_BUILDER_CHECK(file != nullptr, "Invalid data file: null");
+ ICEBERG_BUILDER_CHECK(file->partition_spec_id.has_value(),
+ "Data file must have partition spec ID");
+
+ int32_t spec_id = file->partition_spec_id.value();
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto spec, Spec(spec_id));
+
+ auto& data_files = new_data_files_by_spec_[spec_id];
+ auto [iter, inserted] = data_files.insert(file);
+ if (inserted) {
+ has_new_files_ = true;
+ ICEBERG_BUILDER_RETURN_IF_ERROR(summary_.AddedFile(*spec, *file));
+ }
+
+ return *this;
+}
+
+FastAppend& FastAppend::AppendManifest(const ManifestFile& manifest) {
+ ICEBERG_BUILDER_CHECK(!manifest.has_existing_files(),
+ "Cannot append manifest with existing files");
+ ICEBERG_BUILDER_CHECK(!manifest.has_deleted_files(),
+ "Cannot append manifest with deleted files");
+ ICEBERG_BUILDER_CHECK(manifest.added_snapshot_id == kInvalidSnapshotId,
+ "Snapshot id must be assigned during commit");
+ ICEBERG_BUILDER_CHECK(manifest.sequence_number == kInvalidSequenceNumber,
+ "Sequence number must be assigned during commit");
+
+ if (can_inherit_snapshot_id() && manifest.added_snapshot_id ==
kInvalidSnapshotId) {
+ summary_.AddedManifest(manifest);
+ append_manifests_.push_back(manifest);
+ } else {
+ // The manifest must be rewritten with this update's snapshot ID
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto copied_manifest,
CopyManifest(manifest));
+ rewritten_append_manifests_.push_back(std::move(copied_manifest));
+ }
+
+ return *this;
+}
+
+std::string FastAppend::operation() { return DataOperation::kAppend; }
+
+Result<std::vector<ManifestFile>> FastAppend::Apply(
+ const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>&
snapshot) {
+ std::vector<ManifestFile> manifests;
+
+ ICEBERG_ASSIGN_OR_RAISE(auto new_written_manifests, WriteNewManifests());
+ manifests.reserve(new_written_manifests.size() + append_manifests_.size() +
+ rewritten_append_manifests_.size());
+ if (!new_written_manifests.empty()) {
+ manifests.insert(manifests.end(),
+ std::make_move_iterator(new_written_manifests.begin()),
+ std::make_move_iterator(new_written_manifests.end()));
+ }
+
+ // Transform append manifests and rewritten append manifests with snapshot ID
+ int64_t snapshot_id = SnapshotId();
+ for (auto& manifest : append_manifests_) {
+ manifest.added_snapshot_id = snapshot_id;
+ }
+ for (auto& manifest : rewritten_append_manifests_) {
+ manifest.added_snapshot_id = snapshot_id;
+ }
+ manifests.insert(manifests.end(), append_manifests_.begin(),
append_manifests_.end());
+ manifests.insert(manifests.end(), rewritten_append_manifests_.begin(),
+ rewritten_append_manifests_.end());
+
+ // Add all manifests from the snapshot
+ if (snapshot != nullptr) {
+ auto cached_snapshot = SnapshotCache(snapshot.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot_manifests,
+
cached_snapshot.Manifests(transaction_->table()->io()));
+ manifests.insert(manifests.end(), snapshot_manifests.begin(),
+ snapshot_manifests.end());
+ }
+
+ return manifests;
+}
+
+std::unordered_map<std::string, std::string> FastAppend::Summary() {
+ summary_.SetPartitionSummaryLimit(
+ base().properties.Get(TableProperties::kWritePartitionSummaryLimit));
+ return summary_.Build();
+}
+
+void FastAppend::CleanUncommitted(const std::unordered_set<std::string>&
committed) {
+ // Clean up new manifests that were written but not committed
+ if (!new_manifests_.empty()) {
+ for (const auto& manifest : new_manifests_) {
+ if (!committed.contains(manifest.manifest_path)) {
+ std::ignore = DeleteFile(manifest.manifest_path);
+ }
+ }
+ new_manifests_.clear();
+ }
+
+ // Clean up only rewritten append manifests as they are always owned by the
table
+ // Don't clean up append manifests as they are added to the manifest list
and are
+ // not compacted
+ if (!rewritten_append_manifests_.empty()) {
+ for (const auto& manifest : rewritten_append_manifests_) {
+ if (!committed.contains(manifest.manifest_path)) {
+ std::ignore = DeleteFile(manifest.manifest_path);
+ }
+ }
+ }
+}
+
+bool FastAppend::CleanupAfterCommit() const {
+ // Cleanup after committing is disabled for FastAppend unless there are
+ // rewritten_append_manifests_ because:
+ // 1.) Appended manifests are never rewritten
+ // 2.) Manifests which are written out as part of AppendFile are already
cleaned
+ // up between commit attempts in WriteNewManifests
+ return !rewritten_append_manifests_.empty();
+}
+
+Result<std::shared_ptr<PartitionSpec>> FastAppend::Spec(int32_t spec_id) {
+ return base().PartitionSpecById(spec_id);
+}
+
+Result<ManifestFile> FastAppend::CopyManifest(const ManifestFile& manifest) {
+ const TableMetadata& current = base();
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, current.Schema());
+ ICEBERG_ASSIGN_OR_RAISE(auto spec,
+
current.PartitionSpecById(manifest.partition_spec_id));
+
+ // Generate a unique manifest path using the transaction's metadata location
+ std::string new_manifest_path = ManifestPath();
+ int64_t snapshot_id = SnapshotId();
+
+ // Copy the manifest with the new snapshot ID.
+ return CopyAppendManifest(manifest, transaction_->table()->io(), schema,
spec,
+ snapshot_id, new_manifest_path,
current.format_version,
+ &summary_);
+}
+
+Result<std::vector<ManifestFile>> FastAppend::WriteNewManifests() {
+ // If there are new files and manifests were already written, clean them up
+ if (has_new_files_ && !new_manifests_.empty()) {
+ for (const auto& manifest : new_manifests_) {
+ std::ignore = DeleteFile(manifest.manifest_path);
+ }
+ new_manifests_.clear();
+ }
+
+ // Write new manifests if there are new data files
+ if (new_manifests_.empty() && !new_data_files_by_spec_.empty()) {
+ for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
+ ICEBERG_ASSIGN_OR_RAISE(auto spec, Spec(spec_id));
+ std::vector<std::shared_ptr<DataFile>> files;
+ files.reserve(data_files.size());
+ std::ranges::copy(data_files, std::back_inserter(files));
+ ICEBERG_ASSIGN_OR_RAISE(auto written_manifests,
WriteDataManifests(files, spec));
+ new_manifests_.insert(new_manifests_.end(),
+ std::make_move_iterator(written_manifests.begin()),
+ std::make_move_iterator(written_manifests.end()));
+ }
+ has_new_files_ = false;
+ }
+
+ return new_manifests_;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/fast_append.h b/src/iceberg/update/fast_append.h
new file mode 100644
index 00000000..87887c74
--- /dev/null
+++ b/src/iceberg/update/fast_append.h
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/update/fast_append.h
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/snapshot_update.h"
+#include "iceberg/util/content_file_util.h"
+
+namespace iceberg {
+
+/// \brief Appending new files in a table.
+///
+/// FastAppend is optimized for appending new data files to a table, it
creates new
+/// manifest files for the added data without compacting or rewriting existing
manifests,
+/// making it faster for write-heavy workloads.
+class ICEBERG_EXPORT FastAppend : public SnapshotUpdate {
+ public:
+ /// \brief Create a new FastAppend instance.
+ ///
+ /// \param table_name The name of the table
+ /// \param transaction The transaction to use for this update
+ /// \return A Result containing the FastAppend instance or an error
+ static Result<std::unique_ptr<FastAppend>> Make(
+ std::string table_name, std::shared_ptr<Transaction> transaction);
+
+ /// \brief Append a data file to this update.
+ ///
+ /// \param file The data file to append
+ /// \return Reference to this for method chaining
+ FastAppend& AppendFile(const std::shared_ptr<DataFile>& file);
+
+ /// \brief Append a manifest file to this update.
+ ///
+ /// The manifest must only contain added files (no existing or deleted
files).
+ /// If the manifest doesn't have a snapshot ID assigned and snapshot ID
inheritance
+ /// is enabled, it will be used directly. Otherwise, it will be copied with
the
+ /// new snapshot ID.
+ ///
+ /// \param manifest The manifest file to append
+ /// \return Reference to this for method chaining
+ FastAppend& AppendManifest(const ManifestFile& manifest);
+
+ std::string operation() override;
+
+ Result<std::vector<ManifestFile>> Apply(
+ const TableMetadata& metadata_to_update,
+ const std::shared_ptr<Snapshot>& snapshot) override;
+ std::unordered_map<std::string, std::string> Summary() override;
+ void CleanUncommitted(const std::unordered_set<std::string>& committed)
override;
+ bool CleanupAfterCommit() const override;
+
+ private:
+ explicit FastAppend(std::string table_name, std::shared_ptr<Transaction>
transaction);
+
+ /// \brief Get the partition spec by spec ID.
+ Result<std::shared_ptr<PartitionSpec>> Spec(int32_t spec_id);
+
+ /// \brief Copy a manifest file with a new snapshot ID.
+ ///
+ /// \param manifest The manifest to copy
+ /// \return The copied manifest file
+ Result<ManifestFile> CopyManifest(const ManifestFile& manifest);
+
+ /// \brief Write new manifests for the accumulated data files.
+ ///
+ /// \return A vector of manifest files, or an error
+ Result<std::vector<ManifestFile>> WriteNewManifests();
+
+ private:
+ std::string table_name_;
+ std::unordered_map<int32_t, DataFileSet> new_data_files_by_spec_;
+ std::vector<ManifestFile> append_manifests_;
+ std::vector<ManifestFile> rewritten_append_manifests_;
+ std::vector<ManifestFile> new_manifests_;
+ bool has_new_files_{false};
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index 3387fd11..4ca40684 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -18,6 +18,7 @@
install_headers(
[
'expire_snapshots.h',
+ 'fast_append.h',
'pending_update.h',
'snapshot_update.h',
'update_location.h',
diff --git a/src/iceberg/update/snapshot_update.h
b/src/iceberg/update/snapshot_update.h
index 48ef1676..f31327fc 100644
--- a/src/iceberg/update/snapshot_update.h
+++ b/src/iceberg/update/snapshot_update.h
@@ -51,6 +51,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
~SnapshotUpdate() override;
+ Kind kind() const override { return Kind::kUpdateSnapshot; }
+
/// \brief Set a callback to delete files instead of the table's default.
///
/// \param delete_func A function used to delete file locations
@@ -74,6 +76,16 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
return self;
}
+ /// \brief Set a summary property.
+ ///
+ /// \param property The property name
+ /// \param value The property value
+ /// \return Reference to this for method chaining
+ auto& Set(this auto& self, const std::string& property, const std::string&
value) {
+ self.summary_.Set(property, value);
+ return self;
+ }
+
/// \brief Apply the update's changes to create a new snapshot.
///
/// This method validates the changes, applies them to the metadata,
@@ -95,6 +107,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
/// \param spec The partition spec to use
/// \param data_sequence_number Optional data sequence number for the files
/// \return A vector of manifest files
+ /// TODO(xxx): Change signature to accept iterator begin/end instead of
vector to avoid
+ /// intermediate vector allocations (e.g., from DataFileSet)
Result<std::vector<ManifestFile>> WriteDataManifests(
const std::vector<std::shared_ptr<DataFile>>& data_files,
const std::shared_ptr<PartitionSpec>& spec,
@@ -167,6 +181,16 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate
{
/// \brief Get or generate the snapshot ID for the new snapshot.
int64_t SnapshotId();
+ /// \brief Delete a file at the given path.
+ ///
+ /// \param path The path of the file to delete
+ /// \return A status indicating the result of the deletion
+ Status DeleteFile(const std::string& path);
+
+ std::string ManifestPath();
+ std::string ManifestListPath();
+ SnapshotSummaryBuilder& summary_builder() { return summary_; }
+
private:
/// \brief Returns the snapshot summary from the implementation and updates
totals.
Result<std::unordered_map<std::string, std::string>> ComputeSummary(
@@ -175,9 +199,8 @@ class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
/// \brief Clean up all uncommitted files
void CleanAll();
- Status DeleteFile(const std::string& path);
- std::string ManifestListPath();
- std::string ManifestPath();
+ protected:
+ SnapshotSummaryBuilder summary_;
private:
const bool can_inherit_snapshot_id_{true};
diff --git a/src/iceberg/util/content_file_util.h
b/src/iceberg/util/content_file_util.h
index e173a41a..95a8d634 100644
--- a/src/iceberg/util/content_file_util.h
+++ b/src/iceberg/util/content_file_util.h
@@ -27,6 +27,7 @@
#include <span>
#include <string>
#include <unordered_set>
+#include <vector>
#include "iceberg/iceberg_export.h"
#include "iceberg/manifest/manifest_entry.h"
@@ -35,6 +36,72 @@
namespace iceberg {
+/// \brief A set of DataFile pointers with insertion order preserved and
deduplicated by
+/// file path.
+class ICEBERG_EXPORT DataFileSet {
+ public:
+ using value_type = std::shared_ptr<DataFile>;
+ using iterator = typename std::vector<value_type>::iterator;
+ using const_iterator = typename std::vector<value_type>::const_iterator;
+ using difference_type = typename std::vector<value_type>::difference_type;
+
+ DataFileSet() = default;
+
+ /// \brief Insert a data file into the set.
+ /// \param file The data file to insert
+ /// \return A pair with an iterator to the inserted element (or the existing
one) and
+ /// a bool indicating whether insertion took place
+ std::pair<iterator, bool> insert(const value_type& file) { return
InsertImpl(file); }
+
+ /// \brief Insert a data file into the set (move version).
+ std::pair<iterator, bool> insert(value_type&& file) {
+ return InsertImpl(std::move(file));
+ }
+
+ /// \brief Get the number of elements in the set.
+ size_t size() const { return elements_.size(); }
+
+ /// \brief Check if the set is empty.
+ bool empty() const { return elements_.empty(); }
+
+ /// \brief Clear all elements from the set.
+ void clear() {
+ elements_.clear();
+ index_by_path_.clear();
+ }
+
+ /// \brief Get iterator to the beginning.
+ iterator begin() { return elements_.begin(); }
+ const_iterator begin() const { return elements_.begin(); }
+ const_iterator cbegin() const { return elements_.cbegin(); }
+
+ /// \brief Get iterator to the end.
+ iterator end() { return elements_.end(); }
+ const_iterator end() const { return elements_.end(); }
+ const_iterator cend() const { return elements_.cend(); }
+
+ private:
+ std::pair<iterator, bool> InsertImpl(value_type file) {
+ if (!file) {
+ return {elements_.end(), false};
+ }
+
+ auto [index_iter, inserted] =
+ index_by_path_.try_emplace(file->file_path, elements_.size());
+ if (!inserted) {
+ auto pos = static_cast<difference_type>(index_iter->second);
+ return {elements_.begin() + pos, false};
+ }
+
+ elements_.push_back(std::move(file));
+ return {std::prev(elements_.end()), true};
+ }
+
+ // Vector to preserve insertion order
+ std::vector<value_type> elements_;
+ std::unordered_map<std::string_view, size_t, StringHash, StringEqual>
index_by_path_;
+};
+
/// \brief Utility functions for content files.
struct ICEBERG_EXPORT ContentFileUtil {
/// \brief Check if a delete file is a deletion vector (DV).