wgtmac commented on code in PR #490: URL: https://github.com/apache/iceberg-cpp/pull/490#discussion_r2664343291
########## src/iceberg/update/expire_snapshots.cc: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include <cstdint> +#include <iostream> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make( + std::shared_ptr<Transaction> transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create ExpireSnapshots without a transaction"); + return std::shared_ptr<ExpireSnapshots>(new ExpireSnapshots(std::move(transaction))); +} + +ExpireSnapshots::ExpireSnapshots( + [[maybe_unused]] std::shared_ptr<Transaction> transaction) Review Comment: ```suggestion std::shared_ptr<Transaction> transaction) ``` ########## src/iceberg/update/expire_snapshots.h: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cstdint> +#include <functional> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" +#include "iceberg/util/timepoint.h" + +/// \file iceberg/update/expire_snapshots.h +/// \brief API for removing old snapshots from a table. + +namespace iceberg { + +/// \brief An enum representing possible clean up levels used in snapshot expiration. +enum class CleanupLevel : uint8_t { + /// Skip all file cleanup, only remove snapshot metadata. + kNone, + /// Clean up only metadata files (manifests, manifest lists, statistics), retain data + /// files. + kMetadataOnly, + /// Clean up both metadata and data files (default). + kAll +}; + +/// \brief API for removing old snapshots from a table. +/// +/// This API accumulates snapshot deletions and commits the new list to the table. This +/// API does not allow deleting the current snapshot. +/// +/// When committing, these changes will be applied to the latest table metadata. Commit +/// conflicts will be resolved by applying the changes to the new latest metadata and +/// reattempting the commit. +/// +/// Manifest files that are no longer used by valid snapshots will be deleted. Data files +/// that were deleted by snapshots that are expired will be deleted. DeleteWith() can be +/// used to pass an alternative deletion method. +/// +/// Apply() returns a list of the snapshots that will be removed. +class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { + public: + static Result<std::shared_ptr<ExpireSnapshots>> Make( + std::shared_ptr<Transaction> transaction); + + ~ExpireSnapshots() override; + + using SnapshotToRef = std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>; + + struct ExpireSnapshotsResult { + SnapshotToRef ref_to_remove; + std::vector<int64_t> snapshot_ids_to_remove; + std::vector<int32_t> partition_spec_to_remove; + std::unordered_set<int32_t> schema_to_remove; + }; + + /// \brief Expires a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of the snapshot to expire. + /// \return Reference to this for method chaining. + ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id); + + /// \brief Expires all snapshots older than the given timestamp. + /// + /// \param timestamp_millis A long timestamp in milliseconds. + /// \return Reference to this for method chaining. + ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis); + + /// \brief Retains the most recent ancestors of the current snapshot. + /// + /// If a snapshot would be expired because it is older than the expiration timestamp, + /// but is one of the num_snapshots most recent ancestors of the current state, it will + /// be retained. This will not cause snapshots explicitly identified by id from + /// expiring. + /// + /// This may keep more than num_snapshots ancestors if snapshots are added concurrently. + /// This may keep less than num_snapshots ancestors if the current table state does not + /// have that many. + /// + /// \param num_snapshots The number of snapshots to retain. + /// \return Reference to this for method chaining. + ExpireSnapshots& RetainLast(int num_snapshots); + + /// \brief Passes an alternative delete implementation that will be used for manifests + /// and data files. + /// + /// Manifest files that are no longer used by valid snapshots will be deleted. Data + /// files that were deleted by snapshots that are expired will be deleted. + /// + /// If this method is not called, unnecessary manifests and data files will still be + /// deleted. + /// + /// \param delete_func A function that will be called to delete manifests and data files + /// \return Reference to this for method chaining. + ExpireSnapshots& DeleteWith(std::function<void(const std::string&)> delete_func); + + /// \brief Configures the cleanup level for expired files. + /// + /// This method provides fine-grained control over which files are cleaned up during + /// snapshot expiration. + /// + /// Consider CleanupLevel::kMetadataOnly when data files are shared across tables or + /// when using procedures like add-files that may reference the same data files. + /// + /// Consider CleanupLevel::kNone when data and metadata files may be more efficiently + /// removed using a distributed framework through the actions API. + /// + /// \param level The cleanup level to use for expired snapshots. + /// \return Reference to this for method chaining. + ExpireSnapshots& CleanupLevel(enum CleanupLevel level); + + /// \brief Enable cleaning up unused metadata, such as partition specs, schemas, etc. + /// + /// \param clean Remove unused partition specs, schemas, or other metadata when true. + /// \return Reference to this for method chaining. + ExpireSnapshots& CleanExpiredMetadata(bool clean); + + Kind kind() const final { return Kind::kExpireSnapshots; } + + /// \brief Apply the pending changes and return the results + /// \return The results of changes + Result<ExpireSnapshotsResult> Apply(); + + Status Commit() override; + + private: + explicit ExpireSnapshots([[maybe_unused]] std::shared_ptr<Transaction> transaction); + + Result<std::vector<int64_t>> ComputeBranchSnapshotsToRetain( + const Table& table, int64_t snapshot, int64_t expire_snapshot_older_than, + int32_t min_snapshots_to_keep); + + Result<std::vector<int64_t>> ComputeAllBranchSnapshotIds( + const Table& table, const SnapshotToRef& retained_refs); + + Result<std::vector<int64_t>> UnreferencedSnapshotIds( + const Table& table, const TableMetadata& current_metadata, + const SnapshotToRef& retained_refs); + + private: + // Internal state + int32_t default_min_num_snapshots_; + int64_t default_max_ref_age_ms_; + int64_t default_expire_older_than_; + TimePointMs current_time_ms_; + std::vector<int64_t> snapshot_ids_to_expire_; + std::function<void(const std::string&)> delete_func_; + enum CleanupLevel cleanup_level_ { CleanupLevel::kAll }; Review Comment: ```suggestion CleanupLevel cleanup_level_ { CleanupLevel::kAll }; ``` ########## src/iceberg/update/expire_snapshots.h: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include <cstdint> +#include <functional> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" +#include "iceberg/util/timepoint.h" + +/// \file iceberg/update/expire_snapshots.h +/// \brief API for removing old snapshots from a table. + +namespace iceberg { + +/// \brief An enum representing possible clean up levels used in snapshot expiration. +enum class CleanupLevel : uint8_t { + /// Skip all file cleanup, only remove snapshot metadata. + kNone, + /// Clean up only metadata files (manifests, manifest lists, statistics), retain data + /// files. + kMetadataOnly, + /// Clean up both metadata and data files (default). + kAll +}; + +/// \brief API for removing old snapshots from a table. +/// +/// This API accumulates snapshot deletions and commits the new list to the table. This +/// API does not allow deleting the current snapshot. +/// +/// When committing, these changes will be applied to the latest table metadata. Commit +/// conflicts will be resolved by applying the changes to the new latest metadata and +/// reattempting the commit. +/// +/// Manifest files that are no longer used by valid snapshots will be deleted. Data files +/// that were deleted by snapshots that are expired will be deleted. DeleteWith() can be +/// used to pass an alternative deletion method. +/// +/// Apply() returns a list of the snapshots that will be removed. +class ICEBERG_EXPORT ExpireSnapshots : public PendingUpdate { + public: + static Result<std::shared_ptr<ExpireSnapshots>> Make( + std::shared_ptr<Transaction> transaction); + + ~ExpireSnapshots() override; + + using SnapshotToRef = std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>; + + struct ExpireSnapshotsResult { + SnapshotToRef ref_to_remove; + std::vector<int64_t> snapshot_ids_to_remove; + std::vector<int32_t> partition_spec_to_remove; + std::unordered_set<int32_t> schema_to_remove; + }; + + /// \brief Expires a specific Snapshot identified by id. + /// + /// \param snapshot_id Long id of the snapshot to expire. + /// \return Reference to this for method chaining. + ExpireSnapshots& ExpireSnapshotId(int64_t snapshot_id); + + /// \brief Expires all snapshots older than the given timestamp. + /// + /// \param timestamp_millis A long timestamp in milliseconds. + /// \return Reference to this for method chaining. + ExpireSnapshots& ExpireOlderThan(int64_t timestamp_millis); + + /// \brief Retains the most recent ancestors of the current snapshot. + /// + /// If a snapshot would be expired because it is older than the expiration timestamp, + /// but is one of the num_snapshots most recent ancestors of the current state, it will + /// be retained. This will not cause snapshots explicitly identified by id from + /// expiring. + /// + /// This may keep more than num_snapshots ancestors if snapshots are added concurrently. + /// This may keep less than num_snapshots ancestors if the current table state does not + /// have that many. + /// + /// \param num_snapshots The number of snapshots to retain. + /// \return Reference to this for method chaining. + ExpireSnapshots& RetainLast(int num_snapshots); + + /// \brief Passes an alternative delete implementation that will be used for manifests + /// and data files. + /// + /// Manifest files that are no longer used by valid snapshots will be deleted. Data + /// files that were deleted by snapshots that are expired will be deleted. + /// + /// If this method is not called, unnecessary manifests and data files will still be + /// deleted. + /// + /// \param delete_func A function that will be called to delete manifests and data files + /// \return Reference to this for method chaining. + ExpireSnapshots& DeleteWith(std::function<void(const std::string&)> delete_func); + + /// \brief Configures the cleanup level for expired files. + /// + /// This method provides fine-grained control over which files are cleaned up during + /// snapshot expiration. + /// + /// Consider CleanupLevel::kMetadataOnly when data files are shared across tables or + /// when using procedures like add-files that may reference the same data files. + /// + /// Consider CleanupLevel::kNone when data and metadata files may be more efficiently + /// removed using a distributed framework through the actions API. + /// + /// \param level The cleanup level to use for expired snapshots. + /// \return Reference to this for method chaining. + ExpireSnapshots& CleanupLevel(enum CleanupLevel level); Review Comment: ```suggestion ExpireSnapshots& CleanupLevel(CleanupLevel level); ``` ########## src/iceberg/update/expire_snapshots.cc: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include <cstdint> +#include <iostream> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make( Review Comment: Java has the below check. Perhaps we should return an error when gc is not enabled? ```java ValidationException.check( PropertyUtil.propertyAsBoolean(base.properties(), GC_ENABLED, GC_ENABLED_DEFAULT), "Cannot expire snapshots: GC is disabled (deleting files may corrupt other tables)"); ``` ########## src/iceberg/update/expire_snapshots.cc: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include <cstdint> +#include <iostream> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make( + std::shared_ptr<Transaction> transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create ExpireSnapshots without a transaction"); + return std::shared_ptr<ExpireSnapshots>(new ExpireSnapshots(std::move(transaction))); +} + +ExpireSnapshots::ExpireSnapshots( + [[maybe_unused]] std::shared_ptr<Transaction> transaction) + : PendingUpdate(std::move(transaction)), + default_min_num_snapshots_( + transaction_->current().properties.Get(TableProperties::kMinSnapshotsToKeep)), + default_max_ref_age_ms_( + transaction_->current().properties.Get(TableProperties::kMaxRefAgeMs)), + current_time_ms_(std::chrono::time_point_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now())) { + auto max_snapshot_age_ms = + transaction_->current().properties.Get(TableProperties::kMaxSnapshotAgeMs); + default_expire_older_than_ = + current_time_ms_.time_since_epoch().count() - max_snapshot_age_ms; +} + +ExpireSnapshots::~ExpireSnapshots() = default; + +ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) { + const auto& current_metadata = transaction_->current(); + auto iter = std::ranges::find_if(current_metadata.snapshots, Review Comment: Consider using `TableMetadataCache` to cache snapshot ids for fast lookup. ########## src/iceberg/update/expire_snapshots.cc: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include <cstdint> +#include <iostream> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make( + std::shared_ptr<Transaction> transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create ExpireSnapshots without a transaction"); + return std::shared_ptr<ExpireSnapshots>(new ExpireSnapshots(std::move(transaction))); +} + +ExpireSnapshots::ExpireSnapshots( + [[maybe_unused]] std::shared_ptr<Transaction> transaction) + : PendingUpdate(std::move(transaction)), + default_min_num_snapshots_( + transaction_->current().properties.Get(TableProperties::kMinSnapshotsToKeep)), + default_max_ref_age_ms_( + transaction_->current().properties.Get(TableProperties::kMaxRefAgeMs)), + current_time_ms_(std::chrono::time_point_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now())) { + auto max_snapshot_age_ms = + transaction_->current().properties.Get(TableProperties::kMaxSnapshotAgeMs); + default_expire_older_than_ = + current_time_ms_.time_since_epoch().count() - max_snapshot_age_ms; +} + +ExpireSnapshots::~ExpireSnapshots() = default; + +ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) { + const auto& current_metadata = transaction_->current(); + auto iter = std::ranges::find_if(current_metadata.snapshots, + [&](const std::shared_ptr<Snapshot>& snapshot) { + return snapshot->snapshot_id == snapshot_id; + }); + + ICEBERG_BUILDER_CHECK(iter != current_metadata.snapshots.end(), + "Snapshot:{} not exist.", snapshot_id); + snapshot_ids_to_expire_.push_back(snapshot_id); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::ExpireOlderThan(int64_t timestamp_millis) { + ICEBERG_BUILDER_CHECK(timestamp_millis > 0, "Timestamp must be positive: {}", + timestamp_millis); + default_expire_older_than_ = timestamp_millis; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::RetainLast(int num_snapshots) { + ICEBERG_BUILDER_CHECK(num_snapshots > 0, + "Number of snapshots to retain must be positive: {}", + num_snapshots); + default_min_num_snapshots_ = num_snapshots; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::DeleteWith( + std::function<void(const std::string&)> delete_func) { + ICEBERG_BUILDER_CHECK(delete_func != nullptr, "Delete function cannot be null"); + delete_func_ = std::move(delete_func); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) { + cleanup_level_ = level; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::CleanExpiredMetadata(bool clean) { + clean_expired_metadata_ = clean; + return *this; +} + +Result<std::vector<int64_t>> ExpireSnapshots::ComputeBranchSnapshotsToRetain( + const Table& table, int64_t snapshot, int64_t expire_snapshot_older_than, + int32_t min_snapshots_to_keep) { + std::vector<int64_t> snapshot_ids_to_retain; + ICEBERG_ASSIGN_OR_RAISE(auto snapshots, SnapshotUtil::AncestorsOf(table, snapshot)); Review Comment: We cannot use `table` here because it might be stale if this SnapshotUpdate is just a part of the transaction. Instead, we need to call `static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(const std::shared_ptr<Snapshot>& snapshot, const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup)` in this case. ########## src/iceberg/update/expire_snapshots.cc: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include <cstdint> +#include <iostream> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make( + std::shared_ptr<Transaction> transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create ExpireSnapshots without a transaction"); + return std::shared_ptr<ExpireSnapshots>(new ExpireSnapshots(std::move(transaction))); +} + +ExpireSnapshots::ExpireSnapshots( + [[maybe_unused]] std::shared_ptr<Transaction> transaction) + : PendingUpdate(std::move(transaction)), + default_min_num_snapshots_( + transaction_->current().properties.Get(TableProperties::kMinSnapshotsToKeep)), + default_max_ref_age_ms_( + transaction_->current().properties.Get(TableProperties::kMaxRefAgeMs)), + current_time_ms_(std::chrono::time_point_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now())) { + auto max_snapshot_age_ms = + transaction_->current().properties.Get(TableProperties::kMaxSnapshotAgeMs); + default_expire_older_than_ = + current_time_ms_.time_since_epoch().count() - max_snapshot_age_ms; +} + +ExpireSnapshots::~ExpireSnapshots() = default; + +ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) { + const auto& current_metadata = transaction_->current(); + auto iter = std::ranges::find_if(current_metadata.snapshots, + [&](const std::shared_ptr<Snapshot>& snapshot) { + return snapshot->snapshot_id == snapshot_id; + }); + + ICEBERG_BUILDER_CHECK(iter != current_metadata.snapshots.end(), + "Snapshot:{} not exist.", snapshot_id); + snapshot_ids_to_expire_.push_back(snapshot_id); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::ExpireOlderThan(int64_t timestamp_millis) { + ICEBERG_BUILDER_CHECK(timestamp_millis > 0, "Timestamp must be positive: {}", + timestamp_millis); + default_expire_older_than_ = timestamp_millis; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::RetainLast(int num_snapshots) { + ICEBERG_BUILDER_CHECK(num_snapshots > 0, + "Number of snapshots to retain must be positive: {}", + num_snapshots); + default_min_num_snapshots_ = num_snapshots; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::DeleteWith( + std::function<void(const std::string&)> delete_func) { + ICEBERG_BUILDER_CHECK(delete_func != nullptr, "Delete function cannot be null"); + delete_func_ = std::move(delete_func); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) { Review Comment: ```suggestion ExpireSnapshots& ExpireSnapshots::CleanupLevel(CleanupLevel level) { ``` ########## src/iceberg/update/expire_snapshots.cc: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include <cstdint> +#include <iostream> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make( + std::shared_ptr<Transaction> transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create ExpireSnapshots without a transaction"); + return std::shared_ptr<ExpireSnapshots>(new ExpireSnapshots(std::move(transaction))); +} + +ExpireSnapshots::ExpireSnapshots( + [[maybe_unused]] std::shared_ptr<Transaction> transaction) + : PendingUpdate(std::move(transaction)), + default_min_num_snapshots_( + transaction_->current().properties.Get(TableProperties::kMinSnapshotsToKeep)), + default_max_ref_age_ms_( + transaction_->current().properties.Get(TableProperties::kMaxRefAgeMs)), + current_time_ms_(std::chrono::time_point_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now())) { + auto max_snapshot_age_ms = + transaction_->current().properties.Get(TableProperties::kMaxSnapshotAgeMs); + default_expire_older_than_ = + current_time_ms_.time_since_epoch().count() - max_snapshot_age_ms; +} + +ExpireSnapshots::~ExpireSnapshots() = default; + +ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) { + const auto& current_metadata = transaction_->current(); + auto iter = std::ranges::find_if(current_metadata.snapshots, + [&](const std::shared_ptr<Snapshot>& snapshot) { + return snapshot->snapshot_id == snapshot_id; + }); + + ICEBERG_BUILDER_CHECK(iter != current_metadata.snapshots.end(), + "Snapshot:{} not exist.", snapshot_id); + snapshot_ids_to_expire_.push_back(snapshot_id); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::ExpireOlderThan(int64_t timestamp_millis) { + ICEBERG_BUILDER_CHECK(timestamp_millis > 0, "Timestamp must be positive: {}", + timestamp_millis); + default_expire_older_than_ = timestamp_millis; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::RetainLast(int num_snapshots) { + ICEBERG_BUILDER_CHECK(num_snapshots > 0, + "Number of snapshots to retain must be positive: {}", + num_snapshots); + default_min_num_snapshots_ = num_snapshots; + return *this; +} + +ExpireSnapshots& ExpireSnapshots::DeleteWith( + std::function<void(const std::string&)> delete_func) { + ICEBERG_BUILDER_CHECK(delete_func != nullptr, "Delete function cannot be null"); + delete_func_ = std::move(delete_func); + return *this; +} + +ExpireSnapshots& ExpireSnapshots::CleanupLevel(enum CleanupLevel level) { + cleanup_level_ = level; Review Comment: Java impl has this check: ``` Preconditions.checkArgument( cleanExpiredFiles || level == CleanupLevel.NONE, "Cannot set cleanupLevel to %s when cleanExpiredFiles was explicitly set to false", level); ``` ########## src/iceberg/update/expire_snapshots.cc: ########## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/expire_snapshots.h" + +#include <cstdint> +#include <iostream> +#include <memory> +#include <unordered_set> +#include <vector> + +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/snapshot.h" +#include "iceberg/table.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result<std::shared_ptr<ExpireSnapshots>> ExpireSnapshots::Make( + std::shared_ptr<Transaction> transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create ExpireSnapshots without a transaction"); + return std::shared_ptr<ExpireSnapshots>(new ExpireSnapshots(std::move(transaction))); +} + +ExpireSnapshots::ExpireSnapshots( + [[maybe_unused]] std::shared_ptr<Transaction> transaction) + : PendingUpdate(std::move(transaction)), + default_min_num_snapshots_( + transaction_->current().properties.Get(TableProperties::kMinSnapshotsToKeep)), + default_max_ref_age_ms_( + transaction_->current().properties.Get(TableProperties::kMaxRefAgeMs)), + current_time_ms_(std::chrono::time_point_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now())) { + auto max_snapshot_age_ms = + transaction_->current().properties.Get(TableProperties::kMaxSnapshotAgeMs); + default_expire_older_than_ = + current_time_ms_.time_since_epoch().count() - max_snapshot_age_ms; +} + +ExpireSnapshots::~ExpireSnapshots() = default; + +ExpireSnapshots& ExpireSnapshots::ExpireSnapshotId(int64_t snapshot_id) { + const auto& current_metadata = transaction_->current(); + auto iter = std::ranges::find_if(current_metadata.snapshots, Review Comment: BTW, the Java impl does not check it here. Do we need to follow the same thing? It is valid to ignore a non-existing snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
