This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 6c781c7b feat(io): add bulk delete API to FileIO. (#659)
6c781c7b is described below
commit 6c781c7b1fa56a46f1f7994f18a5dbf8b62385b3
Author: slfan1989 <[email protected]>
AuthorDate: Thu May 21 16:53:55 2026 +0800
feat(io): add bulk delete API to FileIO. (#659)
## Summary
Add a new `FileIO::DeleteFiles(...)` API as a bulk deletion entry point.
The default implementation deletes files sequentially by calling the
existing
`DeleteFile(...)` method and returns the first deletion error
encountered.
This PR only adds the API and backward-compatible fallback behavior. It
does not
yet update `ExpireSnapshots` to use `DeleteFiles(...)`, and it does not
introduce
parallel deletion.
Fixed: #658
## Motivation
`ExpireSnapshots` and other cleanup flows may need to delete many files.
A
bulk deletion API gives FileIO implementations a common extension point
for
future optimized deletion strategies, such as storage-native batch
deletion or
parallel fallback deletion.
---
src/iceberg/arrow/arrow_io.cc | 13 ++++++
src/iceberg/arrow/arrow_io_internal.h | 4 ++
src/iceberg/file_io.cc | 7 ++++
src/iceberg/file_io.h | 11 +++++
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/arrow_io_test.cc | 15 +++++++
src/iceberg/test/file_io_test.cc | 75 ++++++++++++++++++++++++++++++++++
src/iceberg/test/meson.build | 1 +
src/iceberg/update/expire_snapshots.cc | 18 ++++----
9 files changed, 134 insertions(+), 11 deletions(-)
diff --git a/src/iceberg/arrow/arrow_io.cc b/src/iceberg/arrow/arrow_io.cc
index a515f338..45ad4259 100644
--- a/src/iceberg/arrow/arrow_io.cc
+++ b/src/iceberg/arrow/arrow_io.cc
@@ -22,6 +22,7 @@
#include <limits>
#include <mutex>
#include <optional>
+#include <vector>
#include <arrow/buffer.h>
#include <arrow/filesystem/localfs.h>
@@ -568,6 +569,18 @@ Status ArrowFileSystemFileIO::DeleteFile(const
std::string& file_location) {
return {};
}
+Status ArrowFileSystemFileIO::DeleteFiles(
+ const std::vector<std::string>& file_locations) {
+ std::vector<std::string> paths;
+ paths.reserve(file_locations.size());
+ for (const auto& file_location : file_locations) {
+ ICEBERG_ASSIGN_OR_RAISE(auto path, ResolvePath(file_location));
+ paths.push_back(std::move(path));
+ }
+ ICEBERG_ARROW_RETURN_NOT_OK(arrow_fs_->DeleteFiles(paths));
+ return {};
+}
+
std::unique_ptr<FileIO> ArrowFileSystemFileIO::MakeMockFileIO() {
return std::make_unique<ArrowFileSystemFileIO>(
std::make_shared<::arrow::fs::internal::MockFileSystem>(
diff --git a/src/iceberg/arrow/arrow_io_internal.h
b/src/iceberg/arrow/arrow_io_internal.h
index 4f170a8a..a6b85b6c 100644
--- a/src/iceberg/arrow/arrow_io_internal.h
+++ b/src/iceberg/arrow/arrow_io_internal.h
@@ -23,6 +23,7 @@
#include <memory>
#include <optional>
#include <string>
+#include <vector>
#include <arrow/filesystem/type_fwd.h>
#include <arrow/io/type_fwd.h>
@@ -77,6 +78,9 @@ class ICEBERG_BUNDLE_EXPORT ArrowFileSystemFileIO : public
FileIO {
/// \brief Delete a file at the given location.
Status DeleteFile(const std::string& file_location) override;
+ /// \brief Delete files at the given locations.
+ Status DeleteFiles(const std::vector<std::string>& file_locations) override;
+
/// \brief Get the Arrow file system.
const std::shared_ptr<::arrow::fs::FileSystem>& fs() const { return
arrow_fs_; }
diff --git a/src/iceberg/file_io.cc b/src/iceberg/file_io.cc
index d76ffeb6..e4223182 100644
--- a/src/iceberg/file_io.cc
+++ b/src/iceberg/file_io.cc
@@ -100,4 +100,11 @@ Status FileIO::WriteFile(const std::string& file_location,
std::string_view cont
return FinishWithCloseStatus(std::move(status), stream->Close());
}
+Status FileIO::DeleteFiles(const std::vector<std::string>& file_locations) {
+ for (const auto& file_location : file_locations) {
+ ICEBERG_RETURN_UNEXPECTED(DeleteFile(file_location));
+ }
+ return {};
+}
+
} // namespace iceberg
diff --git a/src/iceberg/file_io.h b/src/iceberg/file_io.h
index e772b533..1f91fb0c 100644
--- a/src/iceberg/file_io.h
+++ b/src/iceberg/file_io.h
@@ -26,6 +26,7 @@
#include <span>
#include <string>
#include <string_view>
+#include <vector>
#include "iceberg/iceberg_export.h"
#include "iceberg/result.h"
@@ -154,6 +155,16 @@ class ICEBERG_EXPORT FileIO {
virtual Status DeleteFile(const std::string& file_location) {
return NotImplemented("DeleteFile not implemented");
}
+
+ /// \brief Delete files at the given locations.
+ ///
+ /// Implementations that can delete multiple files efficiently should
override this
+ /// method. The default implementation deletes files sequentially using
DeleteFile
+ /// and returns the first error encountered.
+ ///
+ /// \param file_locations The locations of the files to delete.
+ /// \return void if all deletes succeed, or an error code if any delete
fails.
+ virtual Status DeleteFiles(const std::vector<std::string>& file_locations);
};
} // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 791ad9be..37e66ed5 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -124,6 +124,7 @@ add_iceberg_test(util_test
data_file_set_test.cc
decimal_test.cc
endian_test.cc
+ file_io_test.cc
formatter_test.cc
lazy_test.cc
location_util_test.cc
diff --git a/src/iceberg/test/arrow_io_test.cc
b/src/iceberg/test/arrow_io_test.cc
index 0c885d07..7edaf075 100644
--- a/src/iceberg/test/arrow_io_test.cc
+++ b/src/iceberg/test/arrow_io_test.cc
@@ -21,6 +21,7 @@
#include <array>
#include <memory>
#include <string>
+#include <vector>
#include <arrow/filesystem/localfs.h>
#include <arrow/result.h>
@@ -341,6 +342,20 @@ TEST_F(LocalFileIOTest, DeleteFile) {
EXPECT_THAT(del_res, HasErrorMessage("Cannot delete file"));
}
+TEST_F(LocalFileIOTest, DeleteFiles) {
+ auto first_path = CreateNewTempFilePath();
+ auto second_path = CreateNewTempFilePath();
+ ASSERT_THAT(file_io_->WriteFile(first_path, "hello"), IsOk());
+ ASSERT_THAT(file_io_->WriteFile(second_path, "world"), IsOk());
+
+ std::vector<std::string> paths = {first_path, second_path};
+ EXPECT_THAT(file_io_->DeleteFiles(paths), IsOk());
+
+ EXPECT_THAT(file_io_->ReadFile(first_path, std::nullopt),
IsError(ErrorKind::kIOError));
+ EXPECT_THAT(file_io_->ReadFile(second_path, std::nullopt),
+ IsError(ErrorKind::kIOError));
+}
+
void VerifyReadFullyReadsFromAbsolutePosition(const std::shared_ptr<FileIO>&
file_io,
const std::string& path) {
ASSERT_THAT(file_io->WriteFile(path, "abcdef"), IsOk());
diff --git a/src/iceberg/test/file_io_test.cc b/src/iceberg/test/file_io_test.cc
new file mode 100644
index 00000000..0908572f
--- /dev/null
+++ b/src/iceberg/test/file_io_test.cc
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/file_io.h"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/test/matchers.h"
+
+namespace iceberg {
+namespace {
+
+class RecordingFileIO : public FileIO {
+ public:
+ explicit RecordingFileIO(std::string failure_path = "")
+ : failure_path_(std::move(failure_path)) {}
+
+ Status DeleteFile(const std::string& file_location) override {
+ deleted_paths.push_back(file_location);
+ if (file_location == failure_path_) {
+ return IOError("failed to delete {}", file_location);
+ }
+ return {};
+ }
+
+ std::vector<std::string> deleted_paths;
+
+ private:
+ std::string failure_path_;
+};
+
+} // namespace
+
+TEST(FileIOTest, DeleteFilesFallsBackToDeleteFileForEachPath) {
+ RecordingFileIO file_io;
+ std::vector<std::string> paths = {"file-a.avro", "file-b.avro"};
+
+ EXPECT_THAT(file_io.DeleteFiles(paths), IsOk());
+ EXPECT_THAT(file_io.deleted_paths,
+ ::testing::ElementsAre("file-a.avro", "file-b.avro"));
+}
+
+TEST(FileIOTest, DeleteFilesReturnsFirstDeleteFileError) {
+ RecordingFileIO file_io("file-b.avro");
+ std::vector<std::string> paths = {"file-a.avro", "file-b.avro",
"file-c.avro"};
+
+ auto status = file_io.DeleteFiles(paths);
+
+ EXPECT_THAT(status, IsError(ErrorKind::kIOError));
+ EXPECT_THAT(status, HasErrorMessage("failed to delete file-b.avro"));
+ EXPECT_THAT(file_io.deleted_paths,
+ ::testing::ElementsAre("file-a.avro", "file-b.avro"));
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index e168d08b..1acb46e9 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -88,6 +88,7 @@ iceberg_tests = {
'data_file_set_test.cc',
'decimal_test.cc',
'endian_test.cc',
+ 'file_io_test.cc',
'formatter_test.cc',
'lazy_test.cc',
'location_util_test.cc',
diff --git a/src/iceberg/update/expire_snapshots.cc
b/src/iceberg/update/expire_snapshots.cc
index 8e109d08..c9ac9e4c 100644
--- a/src/iceberg/update/expire_snapshots.cc
+++ b/src/iceberg/update/expire_snapshots.cc
@@ -98,26 +98,22 @@ class FileCleanupStrategy {
return expired;
}
- /// \brief Delete a single file
- void DeleteFile(const std::string& path) {
+ /// \brief Delete files at the given locations.
+ void DeleteFiles(const std::unordered_set<std::string>& paths) {
try {
if (delete_func_) {
- delete_func_(path);
+ for (const auto& path : paths) {
+ delete_func_(path);
+ }
} else {
- std::ignore = file_io_->DeleteFile(path);
+ std::vector<std::string> path_list(paths.begin(), paths.end());
+ std::ignore = file_io_->DeleteFiles(path_list);
}
} catch (...) {
// TODO(shangxinli): add retry
}
}
- // TODO(shangxinli): Add bulk deletion
- void DeleteFiles(const std::unordered_set<std::string>& paths) {
- for (const auto& path : paths) {
- DeleteFile(path);
- }
- }
-
bool HasAnyStatisticsFiles(const TableMetadata& metadata) const {
return !metadata.statistics.empty() ||
!metadata.partition_statistics.empty();
}