This is an automated email from the ASF dual-hosted git repository.
plat1ko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 45661f941e8 [feature](Cloud) Introduce obj storage client interface to
recycler (#35447)
45661f941e8 is described below
commit 45661f941e80265039429b26bf8bb6ef8ec410d5
Author: AlexYue <[email protected]>
AuthorDate: Mon Jun 3 20:12:14 2024 +0800
[feature](Cloud) Introduce obj storage client interface to recycler (#35447)
Extract basic interface to suite different kinds of ObjectStorage.
---
be/src/common/status.h | 18 +-
be/src/io/fs/err_utils.cpp | 7 +
be/src/io/fs/obj_storage_client.h | 24 +-
be/src/io/fs/s3_file_reader.cpp | 5 +-
be/src/io/fs/s3_file_system.cpp | 73 ++--
be/src/io/fs/s3_file_writer.cpp | 16 +-
be/src/io/fs/s3_obj_storage_client.cpp | 112 ++++--
be/src/io/fs/s3_obj_storage_client.h | 7 +-
be/test/io/fs/s3_file_writer_test.cpp | 17 +-
cloud/src/recycler/obj_store_accessor.h | 57 +++
cloud/src/recycler/s3_accessor.cpp | 389 ++++-----------------
cloud/src/recycler/s3_accessor.h | 6 +-
cloud/src/recycler/s3_obj_client.cpp | 371 ++++++++++++++++++++
.../src/recycler/s3_obj_client.h | 48 ++-
cloud/test/s3_accessor_test.cpp | 18 +
15 files changed, 715 insertions(+), 453 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 137a268b5aa..34e13749165 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -25,6 +25,14 @@
namespace doris {
+namespace io {
+struct ObjectStorageStatus;
+}
+
+class Status;
+
+extern io::ObjectStorageStatus convert_to_obj_response(Status st);
+
class PStatus;
namespace ErrorCode {
@@ -352,11 +360,11 @@ public:
Status() : _code(ErrorCode::OK), _err_msg(nullptr) {}
// used to convert Exception to Status
- Status(int code, std::string msg, std::string stack) : _code(code) {
+ Status(int code, std::string msg, std::string stack = "") : _code(code) {
_err_msg = std::make_unique<ErrMsg>();
- _err_msg->_msg = msg;
+ _err_msg->_msg = std::move(msg);
#ifdef ENABLE_STACKTRACE
- _err_msg->_stack = stack;
+ _err_msg->_stack = std::move(stack);
#endif
}
@@ -529,6 +537,10 @@ public:
std::string_view msg() const { return _err_msg ? _err_msg->_msg :
std::string_view(""); }
+ std::pair<int, std::string> retrieve_error_msg() { return {_code,
std::move(_err_msg->_msg)}; }
+
+ friend io::ObjectStorageStatus convert_to_obj_response(Status st);
+
private:
int _code;
struct ErrMsg {
diff --git a/be/src/io/fs/err_utils.cpp b/be/src/io/fs/err_utils.cpp
index 8552c647cdd..6552d454824 100644
--- a/be/src/io/fs/err_utils.cpp
+++ b/be/src/io/fs/err_utils.cpp
@@ -27,10 +27,17 @@
#include "common/status.h"
#include "io/fs/hdfs.h"
+#include "io/fs/obj_storage_client.h"
namespace doris {
using namespace ErrorCode;
+io::ObjectStorageStatus convert_to_obj_response(Status st) {
+ int code = st._code;
+ std::string msg = st._err_msg == nullptr ? "" :
std::move(st._err_msg->_msg);
+ return io::ObjectStorageStatus {.code = code, .msg = std::move(msg)};
+}
+
namespace io {
std::string errno_to_str() {
diff --git a/be/src/io/fs/obj_storage_client.h
b/be/src/io/fs/obj_storage_client.h
index 40e0ff9a8fe..3ab0a8e2dea 100644
--- a/be/src/io/fs/obj_storage_client.h
+++ b/be/src/io/fs/obj_storage_client.h
@@ -46,14 +46,26 @@ struct ObjectStoragePathOptions {
struct ObjectCompleteMultiParts {};
+struct ObjectStorageStatus {
+ int code = 0;
+ std::string msg = std::string();
+};
+
+// We only store error code along with err_msg instead of Status to unify BE
and recycler's error handle logic
struct ObjectStorageResponse {
- Status status = Status::OK();
+ ObjectStorageStatus status {};
+ int http_code {200};
+ std::string request_id = std::string();
+};
+
+struct ObjectStorageUploadResponse {
+ ObjectStorageResponse resp {};
std::optional<std::string> upload_id = std::nullopt;
std::optional<std::string> etag = std::nullopt;
};
struct ObjectStorageHeadResponse {
- Status status = Status::OK();
+ ObjectStorageResponse resp {};
long long file_size {0};
};
@@ -62,7 +74,8 @@ public:
virtual ~ObjStorageClient() = default;
// Create a multi-part upload request. On AWS-compatible systems, it will
return an upload ID, but not on Azure.
// The input parameters should include the bucket and key for the object
storage.
- virtual ObjectStorageResponse create_multipart_upload(const
ObjectStoragePathOptions& opts) = 0;
+ virtual ObjectStorageUploadResponse create_multipart_upload(
+ const ObjectStoragePathOptions& opts) = 0;
// To directly upload a piece of data to object storage and generate a
user-visible file.
// You need to clearly specify the bucket and key
virtual ObjectStorageResponse put_object(const ObjectStoragePathOptions&
opts,
@@ -71,8 +84,8 @@ public:
// The temporary file's ID is the value of the part_num passed in
// You need to specify the bucket and key along with the upload_id if it's
AWS-compatible system
// For the same bucket and key, as well as the same part_num, it will
directly replace the original temporary file.
- virtual ObjectStorageResponse upload_part(const ObjectStoragePathOptions&
opts,
- std::string_view stream, int
part_num) = 0;
+ virtual ObjectStorageUploadResponse upload_part(const
ObjectStoragePathOptions& opts,
+ std::string_view stream,
int part_num) = 0;
// To combine the previously uploaded multiple file parts into a complete
file, the file name is the name of the key passed in.
// If it is an AWS-compatible system, the upload_id needs to be included.
// After a successful execution, the large file can be accessed in the
object storage
@@ -88,6 +101,7 @@ public:
size_t offset, size_t bytes_read,
size_t* size_return) = 0;
// According to the passed bucket and prefix, it traverses and retrieves
all files under the prefix, and returns the name and file size of all files.
+ // **Notice**: The files returned by this function contains the full key
in object storage.
virtual ObjectStorageResponse list_objects(const ObjectStoragePathOptions&
opts,
std::vector<FileInfo>* files) =
0;
// According to the bucket and prefix specified by the user, it performs
batch deletion based on the object names in the object array.
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index 1ea6d5c3c03..e7775803198 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -117,8 +117,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_rea
auto resp = client->get_object( { .bucket = _bucket, .key = _key, },
to, offset, bytes_req, bytes_read);
// clang-format on
- if (!resp.status.ok()) {
- return resp.status.append(fmt::format("failed to read from {}",
_path.native()));
+ if (resp.status.code != ErrorCode::OK) {
+ return std::move(Status(resp.status.code, std::move(resp.status.msg))
+ .append(fmt::format("failed to read from {}",
_path.native())));
}
if (*bytes_read != bytes_req) {
return Status::InternalError("failed to read from {}(bytes read: {},
bytes req: {})",
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index 2cd40ea87a6..27aff992f4c 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -131,9 +131,10 @@ Result<int64_t> ObjClientHolder::object_file_size(const
std::string& bucket,
.key = key,
});
- if (!resp.status.ok()) {
- return ResultError(resp.status.append(
- fmt::format("failed to head s3 file {}", full_s3_path(bucket,
key))));
+ if (resp.resp.status.code != ErrorCode::OK) {
+ return ResultError(std::move(Status(resp.resp.status.code,
std::move(resp.resp.status.msg))
+ .append(fmt::format("failed to
head s3 file {}",
+
full_s3_path(bucket, key)))));
}
return resp.file_size;
@@ -207,10 +208,11 @@ Status S3FileSystem::delete_file_impl(const Path& file) {
auto resp = client->delete_object({.bucket = _bucket, .key = key});
- if (resp.status.ok() || resp.status.is<ErrorCode::NOT_FOUND>()) {
+ if (resp.status.code == ErrorCode::OK || resp.status.code ==
ErrorCode::NOT_FOUND) {
return Status::OK();
}
- return resp.status.append(fmt::format("failed to delete file {}",
full_s3_path(key)));
+ return std::move(Status(resp.status.code, std::move(resp.status.msg))
+ .append(fmt::format("failed to delete file {}",
full_s3_path(key))));
}
Status S3FileSystem::delete_directory_impl(const Path& dir) {
@@ -222,13 +224,12 @@ Status S3FileSystem::delete_directory_impl(const Path&
dir) {
prefix.push_back('/');
}
- return client
- ->delete_objects_recursively({
- .path = full_s3_path(prefix),
- .bucket = _bucket,
- .prefix = prefix,
- })
- .status;
+ auto resp = client->delete_objects_recursively({
+ .path = full_s3_path(prefix),
+ .bucket = _bucket,
+ .prefix = prefix,
+ });
+ return {resp.status.code, std::move(resp.status.msg)};
}
Status S3FileSystem::batch_delete_impl(const std::vector<Path>& remote_files) {
@@ -251,8 +252,9 @@ Status S3FileSystem::batch_delete_impl(const
std::vector<Path>& remote_files) {
return Status::OK();
}
// clang-format off
- RETURN_IF_ERROR(client->delete_objects( {.bucket = _bucket,},
std::move(objects))
- .status);
+ if (auto resp = client->delete_objects( {.bucket = _bucket,},
std::move(objects)); resp.status.code != ErrorCode::OK) {
+ return {resp.status.code, std::move(resp.status.msg)};
+ }
// clang-format on
} while (path_iter != remote_files.end());
@@ -266,12 +268,14 @@ Status S3FileSystem::exists_impl(const Path& path, bool*
res) const {
auto resp = client->head_object({.bucket = _bucket, .key = key});
- if (resp.status.ok()) {
+ if (resp.resp.status.code == ErrorCode::OK) {
*res = true;
- } else if (resp.status.is<ErrorCode::NOT_FOUND>()) {
+ } else if (resp.resp.status.code == ErrorCode::NOT_FOUND) {
*res = false;
} else {
- return resp.status.append(fmt::format("failed to check exists {}",
full_s3_path(key)));
+ return std::move(
+ Status(resp.resp.status.code, std::move(resp.resp.status.msg))
+ .append(fmt::format(" failed to check exists {}",
full_s3_path(key))));
}
return Status::OK();
}
@@ -297,8 +301,13 @@ Status S3FileSystem::list_impl(const Path& dir, bool
only_file, std::vector<File
// clang-format off
auto resp = client->list_objects( {.bucket = _bucket, .prefix = prefix,},
files);
// clang-format on
+ if (resp.status.code == ErrorCode::OK) {
+ for (auto&& file : *files) {
+ file.file_name.erase(0, prefix.size());
+ }
+ }
- return resp.status;
+ return {resp.status.code, std::move(resp.status.msg)};
}
Status S3FileSystem::rename_impl(const Path& orig_name, const Path& new_name) {
@@ -347,10 +356,13 @@ Status S3FileSystem::batch_upload_impl(const
std::vector<Path>& local_files,
std::vector<FileWriterPtr> obj_writers(local_files.size());
- auto upload_task = [this](Path local_file, Path remote_file,
FileWriterPtr* obj_writer) {
+ auto upload_task = [&, this](size_t idx) {
+ const auto& local_file = local_files[idx];
+ const auto& remote_file = remote_files[idx];
+ auto& obj_writer = obj_writers[idx];
auto key = DORIS_TRY(get_key(remote_file));
LOG(INFO) << "Start to upload " << local_file.native() << " to " <<
full_s3_path(key);
- RETURN_IF_ERROR(create_file_impl(key, obj_writer, nullptr));
+ RETURN_IF_ERROR(create_file_impl(key, &obj_writer, nullptr));
FileReaderSPtr local_reader;
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(local_file,
&local_reader));
size_t local_buffer_size =
config::s3_file_system_local_upload_buffer_size;
@@ -360,24 +372,18 @@ Status S3FileSystem::batch_upload_impl(const
std::vector<Path>& local_files,
size_t bytes_read = 0;
RETURN_IF_ERROR(local_reader->read_at(
cur_read, Slice {write_buffer.get(), local_buffer_size},
&bytes_read));
- RETURN_IF_ERROR((*obj_writer)->append({write_buffer.get(),
bytes_read}));
+ RETURN_IF_ERROR((*obj_writer).append({write_buffer.get(),
bytes_read}));
cur_read += bytes_read;
}
- RETURN_IF_ERROR((*obj_writer)->close());
+ RETURN_IF_ERROR((*obj_writer).close());
return Status::OK();
};
std::vector<std::future<Status>> futures;
for (int i = 0; i < local_files.size(); ++i) {
- std::shared_ptr<std::packaged_task<Status(Path local_file, Path
remote_file,
- FileWriterPtr * obj_writer)>>
- task = std::make_shared<std::packaged_task<Status(Path
local_file, Path remote_file,
-
FileWriterPtr * obj_writer)>>(
- upload_task);
+ auto task = std::make_shared<std::packaged_task<Status(size_t
idx)>>(upload_task);
futures.emplace_back(task->get_future());
- default_executor()->Submit(
- [t = std::move(task), local = local_files[i], remote =
remote_files[i],
- obj_writer = &obj_writers[i]]() mutable { (*t)(local, remote,
obj_writer); });
+ default_executor()->Submit([t = std::move(task), idx = i]() mutable {
(*t)(idx); });
}
Status s = Status::OK();
for (auto&& f : futures) {
@@ -401,16 +407,15 @@ Status S3FileSystem::download_impl(const Path&
remote_file, const Path& local_fi
auto resp = client->get_object( {.bucket = _bucket, .key = key,},
buf.get(), 0, size, &bytes_read);
// clang-format on
- if (!resp.status.ok()) {
- return resp.status;
+ if (resp.status.code != ErrorCode::OK) {
+ return {resp.status.code, std::move(resp.status.msg)};
}
Aws::OFStream local_file_s;
local_file_s.open(local_file, std::ios::out | std::ios::binary);
if (local_file_s.good()) {
local_file_s << StringViewStream(buf.get(), size).rdbuf();
} else {
- return Status::IOError("failed to download {}: failed to write file:
{}",
- remote_file.native(), local_file.native());
+ return localfs_error(errno, fmt::format("failed to write file {}",
local_file.native()));
}
return Status::OK();
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index 32319235964..91b5aace12b 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -98,10 +98,10 @@ Status S3FileWriter::_create_multi_upload_request() {
return Status::InternalError<false>("invalid obj storage client");
}
auto resp = client->create_multipart_upload(_obj_storage_path_opts);
- if (resp.status.ok()) {
+ if (resp.resp.status.code == ErrorCode::OK) {
_obj_storage_path_opts.upload_id = resp.upload_id;
}
- return resp.status;
+ return {resp.resp.status.code, std::move(resp.resp.status.msg)};
}
void S3FileWriter::_wait_until_finish(std::string_view task_name) {
@@ -304,8 +304,8 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
UploadFileBuffer& buf) {
return;
}
auto resp = client->upload_part(_obj_storage_path_opts,
buf.get_string_view_data(), part_num);
- if (!resp.status.ok()) {
- buf.set_status(std::move(resp.status));
+ if (resp.resp.status.code != ErrorCode::OK) {
+ buf.set_status(Status(resp.resp.status.code,
std::move(resp.resp.status.msg)));
return;
}
s3_bytes_written_total << buf.get_size();
@@ -353,8 +353,8 @@ Status S3FileWriter::_complete() {
TEST_SYNC_POINT_CALLBACK("S3FileWriter::_complete:2",
&_completed_parts);
auto resp = client->complete_multipart_upload(
_obj_storage_path_opts, S3CompleteMultiParts {.parts =
_completed_parts});
- if (!resp.status.ok()) {
- return resp.status;
+ if (resp.status.code != ErrorCode::OK) {
+ return {resp.status.code, std::move(resp.status.msg)};
}
}
s3_file_created_total << 1;
@@ -388,8 +388,8 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
}
TEST_SYNC_POINT_RETURN_WITH_VOID("S3FileWriter::_put_object", this, &buf);
auto resp = client->put_object(_obj_storage_path_opts,
buf.get_string_view_data());
- if (!resp.status.ok()) {
- buf.set_status(std::move(resp.status));
+ if (resp.status.code != ErrorCode::OK) {
+ buf.set_status({resp.status.code, std::move(resp.status.msg)});
return;
}
s3_file_created_total << 1;
diff --git a/be/src/io/fs/s3_obj_storage_client.cpp
b/be/src/io/fs/s3_obj_storage_client.cpp
index 33d15e516b4..c45074e262c 100644
--- a/be/src/io/fs/s3_obj_storage_client.cpp
+++ b/be/src/io/fs/s3_obj_storage_client.cpp
@@ -82,9 +82,8 @@ using Aws::S3::Model::UploadPartOutcome;
namespace doris::io {
using namespace Aws::S3::Model;
-using Aws::S3::S3Client;
-ObjectStorageResponse S3ObjStorageClient::create_multipart_upload(
+ObjectStorageUploadResponse S3ObjStorageClient::create_multipart_upload(
const ObjectStoragePathOptions& opts) {
CreateMultipartUploadRequest create_request;
create_request.WithBucket(opts.bucket).WithKey(opts.key);
@@ -97,13 +96,19 @@ ObjectStorageResponse
S3ObjStorageClient::create_multipart_upload(
SYNC_POINT_CALLBACK("s3_file_writer::_open", &outcome);
if (outcome.IsSuccess()) {
- return ObjectStorageResponse {.upload_id
{outcome.GetResult().GetUploadId()}};
+ return ObjectStorageUploadResponse {.upload_id
{outcome.GetResult().GetUploadId()}};
}
- return ObjectStorageResponse {
- .status = s3fs_error(
- outcome.GetError(),
- fmt::format("failed to create multipart upload {} ",
opts.path.native()))};
+
+ return ObjectStorageUploadResponse {
+ .resp = {convert_to_obj_response(
+ s3fs_error(outcome.GetError(),
+ fmt::format("failed to create
multipart upload {} ",
+ opts.path.native()))),
+ static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()},
+ };
}
+
ObjectStorageResponse S3ObjStorageClient::put_object(const
ObjectStoragePathOptions& opts,
std::string_view stream) {
Aws::S3::Model::PutObjectRequest request;
@@ -122,12 +127,15 @@ ObjectStorageResponse
S3ObjStorageClient::put_object(const ObjectStoragePathOpti
auto st = s3fs_error(response.GetError(),
fmt::format("failed to put object {}",
opts.path.native()));
LOG(WARNING) << st;
- return ObjectStorageResponse {.status = std::move(st)};
+ return ObjectStorageResponse {convert_to_obj_response(std::move(st)),
+
static_cast<int>(response.GetError().GetResponseCode()),
+ response.GetError().GetRequestId()};
}
return {};
}
-ObjectStorageResponse S3ObjStorageClient::upload_part(const
ObjectStoragePathOptions& opts,
- std::string_view stream,
int part_num) {
+
+ObjectStorageUploadResponse S3ObjStorageClient::upload_part(const
ObjectStoragePathOptions& opts,
+ std::string_view
stream, int part_num) {
UploadPartRequest upload_request;
upload_request.WithBucket(opts.bucket)
.WithKey(opts.key)
@@ -160,10 +168,14 @@ ObjectStorageResponse
S3ObjStorageClient::upload_part(const ObjectStoragePathOpt
upload_part_outcome.GetError().GetExceptionName(),
upload_part_outcome.GetError().GetResponseCode());
LOG_WARNING(s.to_string());
- return ObjectStorageResponse {.status = std::move(s)};
+ return ObjectStorageUploadResponse {
+ .resp = {convert_to_obj_response(std::move(s)),
+
static_cast<int>(upload_part_outcome.GetError().GetResponseCode()),
+ upload_part_outcome.GetError().GetRequestId()}};
}
- return ObjectStorageResponse {.etag =
upload_part_outcome.GetResult().GetETag()};
+ return ObjectStorageUploadResponse {.etag =
upload_part_outcome.GetResult().GetETag()};
}
+
ObjectStorageResponse S3ObjStorageClient::complete_multipart_upload(
const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts&
completed_parts) {
CompleteMultipartUploadRequest complete_request;
@@ -188,10 +200,13 @@ ObjectStorageResponse
S3ObjStorageClient::complete_multipart_upload(
fmt::format("failed to complete multi part upload
{}, upload_id={}",
opts.path.native(), *opts.upload_id));
LOG(WARNING) << st;
- return {.status = std::move(st)};
+ return {convert_to_obj_response(std::move(st)),
+
static_cast<int>(complete_outcome.GetError().GetResponseCode()),
+ complete_outcome.GetError().GetRequestId()};
}
return {};
}
+
ObjectStorageHeadResponse S3ObjStorageClient::head_object(const
ObjectStoragePathOptions& opts) {
Aws::S3::Model::HeadObjectRequest request;
request.WithBucket(opts.bucket).WithKey(opts.key);
@@ -200,15 +215,20 @@ ObjectStorageHeadResponse
S3ObjStorageClient::head_object(const ObjectStoragePat
auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
_client->HeadObject(request), "s3_file_system::head_object",
std::ref(request).get());
if (outcome.IsSuccess()) {
- return {.status = Status::OK(), .file_size =
outcome.GetResult().GetContentLength()};
+ return {.resp = {convert_to_obj_response(Status::OK())},
+ .file_size = outcome.GetResult().GetContentLength()};
} else if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
- return {.status = Status::NotFound("")};
+ return {.resp = {convert_to_obj_response(Status::NotFound(""))}};
} else {
- return {.status = s3fs_error(outcome.GetError(),
- fmt::format("failed to check exists {}",
opts.key))};
+ return {.resp = {convert_to_obj_response(
+ s3fs_error(outcome.GetError(),
+ fmt::format("failed to check
exists {}", opts.key))),
+
static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()}};
}
return {};
}
+
ObjectStorageResponse S3ObjStorageClient::get_object(const
ObjectStoragePathOptions& opts,
void* buffer, size_t
offset, size_t bytes_read,
size_t* size_return) {
@@ -220,17 +240,21 @@ ObjectStorageResponse
S3ObjStorageClient::get_object(const ObjectStoragePathOpti
SCOPED_BVAR_LATENCY(s3_bvar::s3_get_latency);
auto outcome = _client->GetObject(request);
if (!outcome.IsSuccess()) {
- return {.status = s3fs_error(outcome.GetError(),
- fmt::format("failed to read from {}",
opts.path.native()))};
+ return {convert_to_obj_response(
+ s3fs_error(outcome.GetError(),
+ fmt::format("failed to read from {}",
opts.path.native()))),
+ static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()};
}
*size_return = outcome.GetResult().GetContentLength();
if (*size_return != bytes_read) {
- return {.status = Status::InternalError(
- "failed to read from {}(bytes read: {}, bytes req:
{})", opts.path.native(),
- *size_return, bytes_read)};
+ return {convert_to_obj_response(
+ Status::InternalError("failed to read from {}(bytes read: {},
bytes req: {})",
+ opts.path.native(), *size_return,
bytes_read))};
}
return {};
}
+
ObjectStorageResponse S3ObjStorageClient::list_objects(const
ObjectStoragePathOptions& opts,
std::vector<FileInfo>*
files) {
Aws::S3::Model::ListObjectsV2Request request;
@@ -243,8 +267,11 @@ ObjectStorageResponse
S3ObjStorageClient::list_objects(const ObjectStoragePathOp
outcome = _client->ListObjectsV2(request);
}
if (!outcome.IsSuccess()) {
- return {.status = s3fs_error(outcome.GetError(),
- fmt::format("failed to list {}",
opts.prefix))};
+ files->clear();
+ return {convert_to_obj_response(s3fs_error(
+ outcome.GetError(), fmt::format("failed to list
{}", opts.prefix))),
+ static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()};
}
for (const auto& obj : outcome.GetResult().GetContents()) {
std::string key = obj.GetKey();
@@ -260,6 +287,7 @@ ObjectStorageResponse
S3ObjStorageClient::list_objects(const ObjectStoragePathOp
} while (is_trucated);
return {};
}
+
ObjectStorageResponse S3ObjStorageClient::delete_objects(const
ObjectStoragePathOptions& opts,
std::vector<std::string> objs) {
Aws::S3::Model::DeleteObjectsRequest delete_request;
@@ -276,16 +304,20 @@ ObjectStorageResponse
S3ObjStorageClient::delete_objects(const ObjectStoragePath
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
auto delete_outcome = _client->DeleteObjects(delete_request);
if (!delete_outcome.IsSuccess()) {
- return {.status = s3fs_error(delete_outcome.GetError(),
- fmt::format("failed to delete dir {}",
opts.key))};
+ return {convert_to_obj_response(
+ s3fs_error(delete_outcome.GetError(),
+ fmt::format("failed to delete dir {}",
opts.key))),
+ static_cast<int>(delete_outcome.GetError().GetResponseCode()),
+ delete_outcome.GetError().GetRequestId()};
}
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
- return {.status = Status::InternalError("failed to delete object {}:
{}", e.GetKey(),
- e.GetMessage())};
+ return {convert_to_obj_response(Status::InternalError("failed to
delete object {}: {}",
+ e.GetKey(),
e.GetMessage()))};
}
return {};
}
+
ObjectStorageResponse S3ObjStorageClient::delete_object(const
ObjectStoragePathOptions& opts) {
Aws::S3::Model::DeleteObjectRequest request;
request.WithBucket(opts.bucket).WithKey(opts.key);
@@ -296,8 +328,10 @@ ObjectStorageResponse
S3ObjStorageClient::delete_object(const ObjectStoragePathO
outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
return {};
}
- return {.status = s3fs_error(outcome.GetError(),
- fmt::format("failed to delete file {}",
opts.key))};
+ return {convert_to_obj_response(s3fs_error(outcome.GetError(),
+ fmt::format("failed to delete
file {}", opts.key))),
+ static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()};
}
ObjectStorageResponse S3ObjStorageClient::delete_objects_recursively(
@@ -314,9 +348,11 @@ ObjectStorageResponse
S3ObjStorageClient::delete_objects_recursively(
outcome = _client->ListObjectsV2(request);
}
if (!outcome.IsSuccess()) {
- return {.status = s3fs_error(
+ return {convert_to_obj_response(s3fs_error(
outcome.GetError(),
- fmt::format("failed to list objects when delete
dir {}", opts.prefix))};
+ fmt::format("failed to list objects when delete
dir {}", opts.prefix))),
+ static_cast<int>(outcome.GetError().GetResponseCode()),
+ outcome.GetError().GetRequestId()};
}
const auto& result = outcome.GetResult();
Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
@@ -331,13 +367,16 @@ ObjectStorageResponse
S3ObjStorageClient::delete_objects_recursively(
SCOPED_BVAR_LATENCY(s3_bvar::s3_delete_latency);
auto delete_outcome = _client->DeleteObjects(delete_request);
if (!delete_outcome.IsSuccess()) {
- return {.status = s3fs_error(delete_outcome.GetError(),
- fmt::format("failed to delete dir
{}", opts.key))};
+ return {convert_to_obj_response(
+ s3fs_error(delete_outcome.GetError(),
+ fmt::format("failed to delete dir
{}", opts.key))),
+
static_cast<int>(delete_outcome.GetError().GetResponseCode()),
+ delete_outcome.GetError().GetRequestId()};
}
if (!delete_outcome.GetResult().GetErrors().empty()) {
const auto& e = delete_outcome.GetResult().GetErrors().front();
- return {.status = Status::InternalError("failed to delete
object {}: {}", opts.key,
- e.GetMessage())};
+ return {convert_to_obj_response(Status::InternalError(
+ "failed to delete object {}: {}", opts.key,
e.GetMessage()))};
}
}
is_trucated = result.GetIsTruncated();
@@ -345,4 +384,5 @@ ObjectStorageResponse
S3ObjStorageClient::delete_objects_recursively(
} while (is_trucated);
return {};
}
+
} // namespace doris::io
\ No newline at end of file
diff --git a/be/src/io/fs/s3_obj_storage_client.h
b/be/src/io/fs/s3_obj_storage_client.h
index 47a9ed733db..ea2b00e58df 100644
--- a/be/src/io/fs/s3_obj_storage_client.h
+++ b/be/src/io/fs/s3_obj_storage_client.h
@@ -39,11 +39,12 @@ class S3ObjStorageClient final : public ObjStorageClient {
public:
S3ObjStorageClient(std::shared_ptr<Aws::S3::S3Client> client) :
_client(std::move(client)) {}
~S3ObjStorageClient() override = default;
- ObjectStorageResponse create_multipart_upload(const
ObjectStoragePathOptions& opts) override;
+ ObjectStorageUploadResponse create_multipart_upload(
+ const ObjectStoragePathOptions& opts) override;
ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
std::string_view stream) override;
- ObjectStorageResponse upload_part(const ObjectStoragePathOptions& opts,
std::string_view,
- int partNum) override;
+ ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions&
opts, std::string_view,
+ int partNum) override;
ObjectStorageResponse complete_multipart_upload(
const ObjectStoragePathOptions& opts,
const ObjectCompleteMultiParts& completed_parts) override;
diff --git a/be/test/io/fs/s3_file_writer_test.cpp
b/be/test/io/fs/s3_file_writer_test.cpp
index 7bad5ad8630..75a49d813a4 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -943,9 +943,8 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_2) {
sp->set_call_back("S3FileWriter::_complete:2", [](auto&& outcome) {
// Deliberately make one upload one part task fail to test if s3 file
writer could
// handle io error
- std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>* parts =
-
try_any_cast<std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>(
- outcome.back());
+ auto* parts =
try_any_cast<std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>(
+ outcome.back());
size_t size = parts->size();
parts->back()->SetPartNumber(size + 2);
});
@@ -992,12 +991,9 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_1) {
sp->set_call_back("S3FileWriter::_complete:1", [](auto&& outcome) {
// Deliberately make one upload one part task fail to test if s3 file
writer could
// handle io error
- const std::pair<std::atomic_bool*,
-
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>& points =
- try_any_cast<const std::pair<
- std::atomic_bool*,
-
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>&>(
- outcome.back());
+ const auto& points = try_any_cast<const std::pair<
+ std::atomic_bool*,
std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>*>&>(
+ outcome.back());
(*points.first) = false;
points.second->pop_back();
});
@@ -1044,7 +1040,8 @@ TEST_F(S3FileWriterTest, multi_part_complete_error_3) {
sp->set_call_back("S3FileWriter::_complete:3", [](auto&& outcome) {
auto pair = try_any_cast_ret<io::ObjectStorageResponse>(outcome);
pair->second = true;
- pair->first = io::ObjectStorageResponse {.status =
Status::IOError("inject error")};
+ pair->first = io::ObjectStorageResponse {
+ .status =
convert_to_obj_response(Status::IOError<false>("inject error"))};
});
Defer defer {[&]() { sp->clear_call_back("S3FileWriter::_complete:3"); }};
auto client = s3_fs->client_holder();
diff --git a/cloud/src/recycler/obj_store_accessor.h
b/cloud/src/recycler/obj_store_accessor.h
index 8787841aba7..a29266133ca 100644
--- a/cloud/src/recycler/obj_store_accessor.h
+++ b/cloud/src/recycler/obj_store_accessor.h
@@ -17,6 +17,7 @@
#pragma once
+#include <functional>
#include <string>
#include <vector>
@@ -25,6 +26,7 @@ namespace doris::cloud {
struct ObjectMeta {
std::string path; // Relative path to accessor prefix
int64_t size {0};
+ int64_t last_modify_second {0};
};
enum class AccessorType {
@@ -69,4 +71,59 @@ private:
const AccessorType type_;
};
+struct ObjectStoragePathOptions {
+ std::string bucket; // blob container in azure
+ std::string key; // blob name
+ std::string prefix; // for batch delete and recursive delete
+ std::string_view endpoint;
+};
+
+struct ObjectStorageDeleteExpiredOptions {
+ ObjectStoragePathOptions path_opts;
+ std::function<std::string(const std::string& path)> relative_path_factory;
+};
+
+struct ObjectCompleteMultiParts {};
+
+struct ObjectStorageResponse {
+ ObjectStorageResponse(int r, std::string msg = "") : ret(r),
error_msg(std::move(msg)) {}
+ // clang-format off
+ int ret {0}; // To unify the error handle logic with BE, we'd better use
the same error code as BE
+ // clang-format on
+ std::string error_msg;
+};
+
+// wrapper class owned by concret fs
+class ObjStorageClient {
+public:
+ virtual ~ObjStorageClient() = default;
+ // To directly upload a piece of data to object storage and generate a
user-visible file.
+ // You need to clearly specify the bucket and key
+ virtual ObjectStorageResponse put_object(const ObjectStoragePathOptions&
opts,
+ std::string_view stream) = 0;
+ // According to the passed bucket and key, it will access whether the
corresponding file exists in the object storage.
+ // If it exists, it will return the corresponding file size
+ virtual ObjectStorageResponse head_object(const ObjectStoragePathOptions&
opts) = 0;
+ // According to the passed bucket and prefix, it traverses and retrieves
all files under the prefix, and returns the name and file size of all files.
+ // **Attention**: The ObjectMeta contains the full key in object storage
+ virtual ObjectStorageResponse list_objects(const ObjectStoragePathOptions&
opts,
+ std::vector<ObjectMeta>* files)
= 0;
+ // According to the bucket and prefix specified by the user, it performs
batch deletion based on the object names in the object array.
+ virtual ObjectStorageResponse delete_objects(const
ObjectStoragePathOptions& opts,
+ std::vector<std::string>
objs) = 0;
+ // Delete the file named key in the object storage bucket.
+ virtual ObjectStorageResponse delete_object(const
ObjectStoragePathOptions& opts) = 0;
+ // According to the prefix, recursively delete all files under the prefix.
+ virtual ObjectStorageResponse delete_objects_recursively(
+ const ObjectStoragePathOptions& opts) = 0;
+ // Delete all the objects under the prefix which expires before the
expired_time
+ virtual ObjectStorageResponse delete_expired(const
ObjectStorageDeleteExpiredOptions& opts,
+ int64_t expired_time) = 0;
+ // Get the objects' expiration time on the bucket
+ virtual ObjectStorageResponse get_life_cycle(const
ObjectStoragePathOptions& opts,
+ int64_t* expiration_days) = 0;
+ // Check if the objects' versioning is on or off
+ virtual ObjectStorageResponse check_versioning(const
ObjectStoragePathOptions& opts) = 0;
+};
+
} // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.cpp
b/cloud/src/recycler/s3_accessor.cpp
index 3f5d78b1c8f..04f642f3831 100644
--- a/cloud/src/recycler/s3_accessor.cpp
+++ b/cloud/src/recycler/s3_accessor.cpp
@@ -32,6 +32,7 @@
#include <algorithm>
#include <execution>
+#include <type_traits>
#include <utility>
#include "common/config.h"
@@ -39,6 +40,7 @@
#include "common/sync_point.h"
#include "rate-limiter/s3_rate_limiter.h"
#include "recycler/obj_store_accessor.h"
+#include "recycler/s3_obj_client.h"
namespace doris::cloud {
@@ -53,43 +55,28 @@ private:
std::array<std::unique_ptr<S3RateLimiterHolder>, 2> _rate_limiters;
};
-[[maybe_unused]] static Aws::Client::AWSError<Aws::S3::S3Errors>
s3_error_factory() {
- return {Aws::S3::S3Errors::INTERNAL_FAILURE, "exceeds limit", "exceeds
limit", false};
-}
-
template <typename Func>
-auto do_s3_rate_limit(S3RateLimitType type, Func callback) ->
decltype(callback()) {
+auto s3_rate_limit(S3RateLimitType op, Func callback) -> decltype(callback()) {
using T = decltype(callback());
if (!config::enable_s3_rate_limiter) {
return callback();
}
- auto sleep_duration =
AccessorRateLimiter::instance().rate_limiter(type)->add(1);
+ auto sleep_duration =
AccessorRateLimiter::instance().rate_limiter(op)->add(1);
if (sleep_duration < 0) {
- return T(s3_error_factory());
+ return T(-1);
}
return callback();
}
-#ifndef UNIT_TEST
-#define HELP_MACRO(ret, req, point_name)
-#else
-#define HELP_MACRO(ret, req, point_name) \
- do { \
- std::pair p {&ret, &req}; \
- [[maybe_unused]] auto ret_pair = [&p]() mutable { \
- TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, &p); \
- return p; \
- }(); \
- return ret; \
- } while (false);
-#endif
-#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name, type) \
- [&]() -> decltype(auto) { \
- using T = decltype((expr)); \
- [[maybe_unused]] T t; \
- HELP_MACRO(t, request, point_name) \
- return do_s3_rate_limit(type, [&]() { return (expr); }); \
- }()
+template <typename Func>
+auto s3_get_rate_limit(Func callback) -> decltype(callback()) {
+ return s3_rate_limit(S3RateLimitType::GET, std::move(callback));
+}
+
+template <typename Func>
+auto s3_put_rate_limit(Func callback) -> decltype(callback()) {
+ return s3_rate_limit(S3RateLimitType::PUT, std::move(callback));
+}
AccessorRateLimiter::AccessorRateLimiter()
: _rate_limiters {std::make_unique<S3RateLimiterHolder>(
@@ -157,83 +144,20 @@ int S3Accessor::init() {
aws_config.region = conf_.region;
aws_config.retryStrategy =
std::make_shared<Aws::Client::DefaultRetryStrategy>(
/*maxRetries = 10, scaleFactor = 25*/);
- s3_client_ = std::make_shared<Aws::S3::S3Client>(
+ auto s3_client = std::make_shared<Aws::S3::S3Client>(
std::move(aws_cred), std::move(aws_config),
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
true /* useVirtualAddressing */);
+ obj_client_ = std::make_shared<S3ObjClient>(std::move(s3_client));
return 0;
}
int S3Accessor::delete_objects_by_prefix(const std::string& relative_path) {
- Aws::S3::Model::ListObjectsV2Request request;
- auto prefix = get_key(relative_path);
- request.WithBucket(conf_.bucket).WithPrefix(prefix);
-
- Aws::S3::Model::DeleteObjectsRequest delete_request;
- delete_request.SetBucket(conf_.bucket);
- bool is_truncated = false;
- do {
- auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_client_->ListObjectsV2(request), std::ref(request).get(),
- "s3_client::list_objects_v2", S3RateLimitType::GET);
- if (!outcome.IsSuccess()) {
- LOG_WARNING("failed to list objects")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", prefix)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
- if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::FORBIDDEN) {
- return 1;
- }
- return -1;
- }
- const auto& result = outcome.GetResult();
- VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
- Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
- objects.reserve(result.GetContents().size());
- for (const auto& obj : result.GetContents()) {
- objects.emplace_back().SetKey(obj.GetKey());
- LOG_INFO("delete object")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("key", obj.GetKey());
- }
- if (!objects.empty()) {
- Aws::S3::Model::Delete del;
- del.WithObjects(std::move(objects)).SetQuiet(true);
- delete_request.SetDelete(std::move(del));
- auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_client_->DeleteObjects(delete_request),
std::ref(delete_request).get(),
- "s3_client::delete_objects", S3RateLimitType::PUT);
- if (!delete_outcome.IsSuccess()) {
- LOG_WARNING("failed to delete objects")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", prefix)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
- if (delete_outcome.GetError().GetResponseCode() ==
- Aws::Http::HttpResponseCode::FORBIDDEN) {
- return 1;
- }
- return -2;
- }
- if (!delete_outcome.GetResult().GetErrors().empty()) {
- const auto& e = delete_outcome.GetResult().GetErrors().front();
- LOG_WARNING("failed to delete object")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("key", e.GetKey())
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", e.GetMessage());
- return -3;
- }
- }
- is_truncated = result.GetIsTruncated();
- request.SetContinuationToken(result.GetNextContinuationToken());
- } while (is_truncated);
- return 0;
+ return s3_get_rate_limit([&]() {
+ return obj_client_->delete_objects_recursively(
+ {.bucket = conf_.bucket, .prefix =
get_key(relative_path)});
+ })
+ .ret;
}
int S3Accessor::delete_objects(const std::vector<std::string>& relative_paths)
{
@@ -244,11 +168,8 @@ int S3Accessor::delete_objects(const
std::vector<std::string>& relative_paths) {
constexpr size_t max_delete_batch = 1000;
auto path_iter = relative_paths.begin();
- Aws::S3::Model::DeleteObjectsRequest delete_request;
- delete_request.SetBucket(conf_.bucket);
do {
- Aws::S3::Model::Delete del;
- Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+ Aws::Vector<std::string> objects;
auto path_begin = path_iter;
for (; path_iter != relative_paths.end() && (path_iter - path_begin <
max_delete_batch);
++path_iter) {
@@ -258,36 +179,16 @@ int S3Accessor::delete_objects(const
std::vector<std::string>& relative_paths) {
.tag("bucket", conf_.bucket)
.tag("key", key)
.tag("size", objects.size());
- objects.emplace_back().SetKey(std::move(key));
+ objects.emplace_back(std::move(key));
}
if (objects.empty()) {
return 0;
}
- del.WithObjects(std::move(objects)).SetQuiet(true);
- delete_request.SetDelete(std::move(del));
- auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_client_->DeleteObjects(delete_request),
std::ref(delete_request).get(),
- "s3_client::delete_objects", S3RateLimitType::PUT);
- if (!delete_outcome.IsSuccess()) {
- LOG_WARNING("failed to delete objects")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("key[0]",
delete_request.GetDelete().GetObjects().front().GetKey())
- .tag("responseCode",
-
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
- .tag("error", delete_outcome.GetError().GetMessage());
- return -1;
- }
- if (!delete_outcome.GetResult().GetErrors().empty()) {
- const auto& e = delete_outcome.GetResult().GetErrors().front();
- LOG_WARNING("failed to delete object")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("key", e.GetKey())
- .tag("responseCode",
-
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
- .tag("error", e.GetMessage());
- return -2;
+ if (auto delete_resp = s3_put_rate_limit([&]() {
+ return obj_client_->delete_objects({.bucket = conf_.bucket},
std::move(objects));
+ });
+ delete_resp.ret != 0) {
+ return delete_resp.ret;
}
} while (path_iter != relative_paths.end());
@@ -295,218 +196,64 @@ int S3Accessor::delete_objects(const
std::vector<std::string>& relative_paths) {
}
int S3Accessor::delete_object(const std::string& relative_path) {
- Aws::S3::Model::DeleteObjectRequest request;
- auto key = get_key(relative_path);
- request.WithBucket(conf_.bucket).WithKey(key);
- auto outcome =
- SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObject(request),
std::ref(request).get(),
- "s3_client::delete_object",
S3RateLimitType::PUT);
- if (!outcome.IsSuccess()) {
- LOG_WARNING("failed to delete object")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("key", key)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage())
- .tag("exception", outcome.GetError().GetExceptionName());
- return -1;
- }
- return 0;
+ return s3_put_rate_limit([&]() {
+ return obj_client_->delete_object({.bucket = conf_.bucket, .key =
get_key(relative_path)})
+ .ret;
+ });
}
int S3Accessor::put_object(const std::string& relative_path, const
std::string& content) {
- Aws::S3::Model::PutObjectRequest request;
- auto key = get_key(relative_path);
- request.WithBucket(conf_.bucket).WithKey(key);
- auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
- *input << content;
- request.SetBody(input);
- auto outcome =
- SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request),
std::ref(request).get(),
- "s3_client::put_object",
S3RateLimitType::PUT);
- if (!outcome.IsSuccess()) {
- LOG_WARNING("failed to put object")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("key", key)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
- return -1;
- }
- return 0;
+ return s3_put_rate_limit([&]() {
+ return obj_client_
+ ->put_object({.bucket = conf_.bucket, .key =
get_key(relative_path)}, content)
+ .ret;
+ });
}
int S3Accessor::list(const std::string& relative_path,
std::vector<ObjectMeta>* files) {
- Aws::S3::Model::ListObjectsV2Request request;
- auto prefix = get_key(relative_path);
- request.WithBucket(conf_.bucket).WithPrefix(prefix);
-
- bool is_truncated = false;
- do {
- auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_client_->ListObjectsV2(request), std::ref(request).get(),
- "s3_client::list_objects_v2", S3RateLimitType::GET);
- ;
- if (!outcome.IsSuccess()) {
- LOG_WARNING("failed to list objects")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", prefix)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
- return -1;
- }
- const auto& result = outcome.GetResult();
- VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
- for (const auto& obj : result.GetContents()) {
- files->push_back({obj.GetKey().substr(conf_.prefix.size() + 1),
obj.GetSize()});
+ return s3_get_rate_limit([&]() {
+ auto resp = obj_client_->list_objects(
+ {.bucket = conf_.bucket, .prefix = get_key(relative_path)},
files);
+
+ if (resp.ret == 0) {
+ auto pos = conf_.prefix.size() + 1;
+ for (auto&& file : *files) {
+ file.path = file.path.substr(pos);
+ }
}
- is_truncated = result.GetIsTruncated();
- request.SetContinuationToken(result.GetNextContinuationToken());
- } while (is_truncated);
- return 0;
+
+ return resp.ret;
+ });
}
int S3Accessor::exist(const std::string& relative_path) {
- Aws::S3::Model::HeadObjectRequest request;
- auto key = get_key(relative_path);
- request.WithBucket(conf_.bucket).WithKey(key);
- auto outcome =
- SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request),
std::ref(request).get(),
- "s3_client::head_object",
S3RateLimitType::GET);
- if (outcome.IsSuccess()) {
- return 0;
- } else if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
- return 1;
- } else {
- LOG_WARNING("failed to head object")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("key", key)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
- return -1;
- }
+ return s3_get_rate_limit([&]() {
+ return obj_client_->head_object({.bucket = conf_.bucket, .key =
get_key(relative_path)})
+ .ret;
+ });
}
int S3Accessor::delete_expired_objects(const std::string& relative_path,
int64_t expired_time) {
- Aws::S3::Model::ListObjectsV2Request request;
- auto prefix = get_key(relative_path);
- request.WithBucket(conf_.bucket).WithPrefix(prefix);
-
- bool is_truncated = false;
- do {
- auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_client_->ListObjectsV2(request), std::ref(request).get(),
- "s3_client::list_objects_v2", S3RateLimitType::GET);
- if (!outcome.IsSuccess()) {
- LOG_WARNING("failed to list objects")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", prefix)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
- return -1;
- }
- const auto& result = outcome.GetResult();
- std::vector<std::string> expired_keys;
- for (const auto& obj : result.GetContents()) {
- if (obj.GetLastModified().Seconds() < expired_time) {
- auto relative_key = get_relative_path(obj.GetKey());
- if (relative_key.empty()) {
- LOG_WARNING("failed get relative path")
- .tag("prefix", conf_.prefix)
- .tag("key", obj.GetKey());
- } else {
- expired_keys.push_back(relative_key);
- LOG_INFO("delete expired object")
- .tag("prefix", conf_.prefix)
- .tag("key", obj.GetKey())
- .tag("relative_key", relative_key)
- .tag("lastModifiedTime",
obj.GetLastModified().Seconds())
- .tag("expiredTime", expired_time);
- }
- }
- }
-
- auto ret = delete_objects(expired_keys);
- if (ret != 0) {
- return ret;
- }
- LOG_INFO("delete expired objects")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", conf_.prefix)
- .tag("num_scanned", result.GetContents().size())
- .tag("num_recycled", expired_keys.size());
- is_truncated = result.GetIsTruncated();
- request.SetContinuationToken(result.GetNextContinuationToken());
- } while (is_truncated);
- return 0;
+ return s3_put_rate_limit([&]() {
+ return obj_client_
+ ->delete_expired(
+ {.path_opts = {.bucket = conf_.bucket, .prefix =
get_key(relative_path)},
+ .relative_path_factory =
+ [&](const std::string& key) { return
get_relative_path(key); }},
+ expired_time)
+ .ret;
+ });
}
int S3Accessor::get_bucket_lifecycle(int64_t* expiration_days) {
- Aws::S3::Model::GetBucketLifecycleConfigurationRequest request;
- request.SetBucket(conf_.bucket);
-
- auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_client_->GetBucketLifecycleConfiguration(request),
std::ref(request).get(),
- "s3_client::get_bucket_lifecycle_configuration",
S3RateLimitType::GET);
- bool has_lifecycle = false;
- if (outcome.IsSuccess()) {
- const auto& rules = outcome.GetResult().GetRules();
- for (const auto& rule : rules) {
- if (rule.NoncurrentVersionExpirationHasBeenSet()) {
- has_lifecycle = true;
- *expiration_days =
rule.GetNoncurrentVersionExpiration().GetNoncurrentDays();
- }
- }
- } else {
- LOG_WARNING("Err for check interval: failed to get bucket lifecycle")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", conf_.prefix)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
- return -1;
- }
-
- if (!has_lifecycle) {
- LOG_WARNING("Err for check interval: bucket doesn't have lifecycle
configuration")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", conf_.prefix);
- return -1;
- }
- return 0;
+ return s3_get_rate_limit([&]() {
+ return obj_client_->get_life_cycle({.bucket = conf_.bucket},
expiration_days).ret;
+ });
}
int S3Accessor::check_bucket_versioning() {
- Aws::S3::Model::GetBucketVersioningRequest request;
- request.SetBucket(conf_.bucket);
- auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
- s3_client_->GetBucketVersioning(request), std::ref(request).get(),
- "s3_client::get_bucket_versioning", S3RateLimitType::GET);
-
- if (outcome.IsSuccess()) {
- const auto& versioning_configuration = outcome.GetResult().GetStatus();
- if (versioning_configuration !=
Aws::S3::Model::BucketVersioningStatus::Enabled) {
- LOG_WARNING("Err for check interval: bucket doesn't enable bucket
versioning")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", conf_.prefix);
- return -1;
- }
- } else {
- LOG_WARNING("Err for check interval: failed to get status of bucket
versioning")
- .tag("endpoint", conf_.endpoint)
- .tag("bucket", conf_.bucket)
- .tag("prefix", conf_.prefix)
- .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
- .tag("error", outcome.GetError().GetMessage());
- return -1;
- }
- return 0;
+ return s3_get_rate_limit(
+ [&]() { return obj_client_->check_versioning({.bucket =
conf_.bucket}).ret; });
}
int GcsAccessor::delete_objects(const std::vector<std::string>&
relative_paths) {
@@ -523,6 +270,4 @@ int GcsAccessor::delete_objects(const
std::vector<std::string>& relative_paths)
}
return ret;
}
-#undef SYNC_POINT_HOOK_RETURN_VALUE
-#undef HELP_MACRO
} // namespace doris::cloud
diff --git a/cloud/src/recycler/s3_accessor.h b/cloud/src/recycler/s3_accessor.h
index 1a44b0971cb..5221f90cafc 100644
--- a/cloud/src/recycler/s3_accessor.h
+++ b/cloud/src/recycler/s3_accessor.h
@@ -20,6 +20,7 @@
#include <memory>
#include "recycler/obj_store_accessor.h"
+#include "recycler/s3_obj_client.h"
namespace Aws::S3 {
class S3Client;
@@ -47,7 +48,8 @@ public:
const std::string& path() const override { return path_; }
- const std::shared_ptr<Aws::S3::S3Client>& s3_client() const { return
s3_client_; }
+ // TODO(ByteYue): refactor this function to suite different kind object
storage
+ const std::shared_ptr<Aws::S3::S3Client>& s3_client() const { return
obj_client_->s3_client(); }
const S3Conf& conf() const { return conf_; }
@@ -89,9 +91,9 @@ private:
std::string get_relative_path(const std::string& key) const;
private:
- std::shared_ptr<Aws::S3::S3Client> s3_client_;
S3Conf conf_;
std::string path_;
+ std::shared_ptr<S3ObjClient> obj_client_;
};
class GcsAccessor final : public S3Accessor {
diff --git a/cloud/src/recycler/s3_obj_client.cpp
b/cloud/src/recycler/s3_obj_client.cpp
new file mode 100644
index 00000000000..2b3d83cd8ce
--- /dev/null
+++ b/cloud/src/recycler/s3_obj_client.cpp
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "recycler/s3_obj_client.h"
+
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/DeleteObjectRequest.h>
+#include <aws/s3/model/DeleteObjectsRequest.h>
+#include <aws/s3/model/GetBucketLifecycleConfigurationRequest.h>
+#include <aws/s3/model/GetBucketVersioningRequest.h>
+#include <aws/s3/model/HeadObjectRequest.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
+#include <aws/s3/model/PutObjectRequest.h>
+
+#include "common/logging.h"
+#include "common/sync_point.h"
+
+namespace doris::cloud {
+
+#ifndef UNIT_TEST
+#define HELPER_MACRO(ret, req, point_name)
+#else
+#define HELPER_MACRO(ret, req, point_name) \
+ do { \
+ std::pair p {&ret, &req}; \
+ [[maybe_unused]] auto ret_pair = [&p]() mutable { \
+ TEST_SYNC_POINT_RETURN_WITH_VALUE(point_name, &p); \
+ return p; \
+ }(); \
+ return ret; \
+ } while (false);
+#endif
+#define SYNC_POINT_HOOK_RETURN_VALUE(expr, request, point_name) \
+ [&]() -> decltype(auto) { \
+ using T = decltype((expr)); \
+ [[maybe_unused]] T t; \
+ HELPER_MACRO(t, request, point_name) \
+ return (expr); \
+ }()
+
+ObjectStorageResponse S3ObjClient::put_object(const ObjectStoragePathOptions&
opts,
+ std::string_view stream) {
+ Aws::S3::Model::PutObjectRequest request;
+ request.WithBucket(opts.bucket).WithKey(opts.key);
+ auto input = Aws::MakeShared<Aws::StringStream>("S3Accessor");
+ *input << stream;
+ request.SetBody(input);
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->PutObject(request),
+ std::ref(request).get(),
"s3_client::put_object");
+ if (!outcome.IsSuccess()) {
+ LOG_WARNING("failed to put object")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("key", opts.key)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage());
+ return -1;
+ }
+ return 0;
+}
+ObjectStorageResponse S3ObjClient::head_object(const ObjectStoragePathOptions&
opts) {
+ Aws::S3::Model::HeadObjectRequest request;
+ request.WithBucket(opts.bucket).WithKey(opts.key);
+ auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->HeadObject(request),
+ std::ref(request).get(),
"s3_client::head_object");
+ if (outcome.IsSuccess()) {
+ return 0;
+ } else if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
+ return 1;
+ } else {
+ LOG_WARNING("failed to head object")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("key", opts.key)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage());
+ return -1;
+ }
+}
+ObjectStorageResponse S3ObjClient::list_objects(const
ObjectStoragePathOptions& opts,
+ std::vector<ObjectMeta>*
files) {
+ Aws::S3::Model::ListObjectsV2Request request;
+ request.WithBucket(opts.bucket).WithPrefix(opts.prefix);
+
+ bool is_truncated = false;
+ do {
+ auto outcome =
+
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
+ std::ref(request).get(),
"s3_client::list_objects_v2");
+ if (!outcome.IsSuccess()) {
+ LOG_WARNING("failed to list objects")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("prefix", opts.prefix)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage());
+ return -1;
+ }
+ const auto& result = outcome.GetResult();
+ VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
+ for (const auto& obj : result.GetContents()) {
+ files->push_back({obj.GetKey(), obj.GetSize(),
obj.GetLastModified().Seconds()});
+ }
+ is_truncated = result.GetIsTruncated();
+ request.SetContinuationToken(result.GetNextContinuationToken());
+ } while (is_truncated);
+ return 0;
+}
+ObjectStorageResponse S3ObjClient::delete_objects(const
ObjectStoragePathOptions& opts,
+ std::vector<std::string>
objs) {
+ Aws::S3::Model::DeleteObjectsRequest delete_request;
+ delete_request.SetBucket(opts.bucket);
+ Aws::S3::Model::Delete del;
+ Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+ for (auto&& obj : objs) {
+ objects.emplace_back().SetKey(std::move(obj));
+ }
+ del.WithObjects(std::move(objects)).SetQuiet(true);
+ delete_request.SetDelete(std::move(del));
+ auto delete_outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->DeleteObjects(delete_request),
+
std::ref(delete_request).get(),
+
"s3_client::delete_objects");
+ if (!delete_outcome.IsSuccess()) {
+ LOG_WARNING("failed to delete objects")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("key[0]",
delete_request.GetDelete().GetObjects().front().GetKey())
+ .tag("responseCode",
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
+ .tag("error", delete_outcome.GetError().GetMessage());
+ return {-1};
+ }
+ if (!delete_outcome.GetResult().GetErrors().empty()) {
+ const auto& e = delete_outcome.GetResult().GetErrors().front();
+ LOG_WARNING("failed to delete object")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("key", e.GetKey())
+ .tag("responseCode",
static_cast<int>(delete_outcome.GetError().GetResponseCode()))
+ .tag("error", e.GetMessage());
+ return {-2};
+ }
+ return {0};
+}
+
+ObjectStorageResponse S3ObjClient::delete_object(const
ObjectStoragePathOptions& opts) {
+ Aws::S3::Model::DeleteObjectRequest request;
+ request.WithBucket(opts.bucket).WithKey(opts.key);
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->DeleteObject(request), std::ref(request).get(),
"s3_client::delete_object");
+ if (!outcome.IsSuccess()) {
+ LOG_WARNING("failed to delete object")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("key", opts.key)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage())
+ .tag("exception", outcome.GetError().GetExceptionName());
+ return -1;
+ }
+ return 0;
+}
+
+ObjectStorageResponse S3ObjClient::delete_objects_recursively(
+ const ObjectStoragePathOptions& opts) {
+ Aws::S3::Model::ListObjectsV2Request request;
+ request.WithBucket(opts.bucket).WithPrefix(opts.prefix);
+
+ Aws::S3::Model::DeleteObjectsRequest delete_request;
+ delete_request.SetBucket(opts.bucket);
+ bool is_truncated = false;
+ do {
+ auto outcome =
+
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
+ std::ref(request).get(),
"s3_client::list_objects_v2");
+
+ if (!outcome.IsSuccess()) {
+ LOG_WARNING("failed to list objects")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("prefix", opts.prefix)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage());
+ if (outcome.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::FORBIDDEN) {
+ return {1};
+ }
+ return {-1};
+ }
+ const auto& result = outcome.GetResult();
+ VLOG_DEBUG << "get " << result.GetContents().size() << " objects";
+ Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+ objects.reserve(result.GetContents().size());
+ for (const auto& obj : result.GetContents()) {
+ objects.emplace_back().SetKey(obj.GetKey());
+ LOG_INFO("delete object")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("key", obj.GetKey());
+ }
+ if (!objects.empty()) {
+ Aws::S3::Model::Delete del;
+ del.WithObjects(std::move(objects)).SetQuiet(true);
+ delete_request.SetDelete(std::move(del));
+ auto delete_outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->DeleteObjects(delete_request),
std::ref(delete_request).get(),
+ "s3_client::delete_objects");
+ if (!delete_outcome.IsSuccess()) {
+ LOG_WARNING("failed to delete objects")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("prefix", opts.prefix)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage());
+ if (delete_outcome.GetError().GetResponseCode() ==
+ Aws::Http::HttpResponseCode::FORBIDDEN) {
+ return {1};
+ }
+ return {-2};
+ }
+ if (!delete_outcome.GetResult().GetErrors().empty()) {
+ const auto& e = delete_outcome.GetResult().GetErrors().front();
+ LOG_WARNING("failed to delete object")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("key", e.GetKey())
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", e.GetMessage());
+ return {-3};
+ }
+ }
+ is_truncated = result.GetIsTruncated();
+ request.SetContinuationToken(result.GetNextContinuationToken());
+ } while (is_truncated);
+ return {0};
+}
+ObjectStorageResponse S3ObjClient::delete_expired(const
ObjectStorageDeleteExpiredOptions& opts,
+ int64_t expired_time) {
+ Aws::S3::Model::ListObjectsV2Request request;
+
request.WithBucket(opts.path_opts.bucket).WithPrefix(opts.path_opts.prefix);
+
+ bool is_truncated = false;
+ do {
+ auto outcome =
+
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->ListObjectsV2(request),
+ std::ref(request).get(),
"s3_client::list_objects_v2");
+ if (!outcome.IsSuccess()) {
+ LOG_WARNING("failed to list objects")
+ .tag("endpoint", opts.path_opts.endpoint)
+ .tag("bucket", opts.path_opts.bucket)
+ .tag("prefix", opts.path_opts.prefix)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage());
+ return -1;
+ }
+ const auto& result = outcome.GetResult();
+ std::vector<std::string> expired_keys;
+ for (const auto& obj : result.GetContents()) {
+ if (obj.GetLastModified().Seconds() < expired_time) {
+ auto relative_key = opts.relative_path_factory(obj.GetKey());
+ if (relative_key.empty()) {
+ LOG_WARNING("failed get relative path")
+ .tag("prefix", opts.path_opts.prefix)
+ .tag("key", obj.GetKey());
+ } else {
+ expired_keys.push_back(obj.GetKey());
+ LOG_INFO("delete expired object")
+ .tag("prefix", opts.path_opts.prefix)
+ .tag("key", obj.GetKey())
+ .tag("relative_key", relative_key)
+ .tag("lastModifiedTime",
obj.GetLastModified().Seconds())
+ .tag("expiredTime", expired_time);
+ }
+ }
+ }
+
+ auto ret = delete_objects(opts.path_opts, std::move(expired_keys));
+ if (ret.ret != 0) {
+ return ret;
+ }
+ LOG_INFO("delete expired objects")
+ .tag("endpoint", opts.path_opts.endpoint)
+ .tag("bucket", opts.path_opts.bucket)
+ .tag("prefix", opts.path_opts.prefix)
+ .tag("num_scanned", result.GetContents().size())
+ .tag("num_recycled", expired_keys.size());
+ is_truncated = result.GetIsTruncated();
+ request.SetContinuationToken(result.GetNextContinuationToken());
+ } while (is_truncated);
+ return 0;
+}
+ObjectStorageResponse S3ObjClient::get_life_cycle(const
ObjectStoragePathOptions& opts,
+ int64_t* expiration_days) {
+ Aws::S3::Model::GetBucketLifecycleConfigurationRequest request;
+ request.SetBucket(opts.bucket);
+
+ auto outcome = SYNC_POINT_HOOK_RETURN_VALUE(
+ s3_client_->GetBucketLifecycleConfiguration(request),
std::ref(request).get(),
+ "s3_client::get_bucket_lifecycle_configuration");
+ bool has_lifecycle = false;
+ if (outcome.IsSuccess()) {
+ const auto& rules = outcome.GetResult().GetRules();
+ for (const auto& rule : rules) {
+ if (rule.NoncurrentVersionExpirationHasBeenSet()) {
+ has_lifecycle = true;
+ *expiration_days =
rule.GetNoncurrentVersionExpiration().GetNoncurrentDays();
+ }
+ }
+ } else {
+ LOG_WARNING("Err for check interval: failed to get bucket lifecycle")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("prefix", opts.prefix)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage());
+ return -1;
+ }
+
+ if (!has_lifecycle) {
+ LOG_WARNING("Err for check interval: bucket doesn't have lifecycle
configuration")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("prefix", opts.prefix);
+ return -1;
+ }
+ return 0;
+}
+
+ObjectStorageResponse S3ObjClient::check_versioning(const
ObjectStoragePathOptions& opts) {
+ Aws::S3::Model::GetBucketVersioningRequest request;
+ request.SetBucket(opts.bucket);
+ auto outcome =
SYNC_POINT_HOOK_RETURN_VALUE(s3_client_->GetBucketVersioning(request),
+ std::ref(request).get(),
+
"s3_client::get_bucket_versioning");
+
+ if (outcome.IsSuccess()) {
+ const auto& versioning_configuration = outcome.GetResult().GetStatus();
+ if (versioning_configuration !=
Aws::S3::Model::BucketVersioningStatus::Enabled) {
+ LOG_WARNING("Err for check interval: bucket doesn't enable bucket
versioning")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("prefix", opts.prefix);
+ return -1;
+ }
+ } else {
+ LOG_WARNING("Err for check interval: failed to get status of bucket
versioning")
+ .tag("endpoint", opts.endpoint)
+ .tag("bucket", opts.bucket)
+ .tag("prefix", opts.prefix)
+ .tag("responseCode",
static_cast<int>(outcome.GetError().GetResponseCode()))
+ .tag("error", outcome.GetError().GetMessage());
+ return -1;
+ }
+ return 0;
+}
+
+#undef SYNC_POINT_HOOK_RETURN_VALUE
+#undef HELPER_MACRO
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/be/src/io/fs/s3_obj_storage_client.h
b/cloud/src/recycler/s3_obj_client.h
similarity index 51%
copy from be/src/io/fs/s3_obj_storage_client.h
copy to cloud/src/recycler/s3_obj_client.h
index 47a9ed733db..891474b5289 100644
--- a/be/src/io/fs/s3_obj_storage_client.h
+++ b/cloud/src/recycler/s3_obj_client.h
@@ -17,49 +17,41 @@
#pragma once
-#include "io/fs/obj_storage_client.h"
-#include "io/fs/s3_file_system.h"
+#include <memory>
+
+#include "recycler/obj_store_accessor.h"
namespace Aws::S3 {
class S3Client;
-namespace Model {
-class CompletedPart;
-}
} // namespace Aws::S3
-namespace doris::io {
-
-struct S3CompleteMultiParts : public ObjectCompleteMultiParts {
- std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& parts;
-};
-
-class ObjClientHolder;
+namespace doris::cloud {
-class S3ObjStorageClient final : public ObjStorageClient {
+class S3ObjClient : public ObjStorageClient {
public:
- S3ObjStorageClient(std::shared_ptr<Aws::S3::S3Client> client) :
_client(std::move(client)) {}
- ~S3ObjStorageClient() override = default;
- ObjectStorageResponse create_multipart_upload(const
ObjectStoragePathOptions& opts) override;
+ S3ObjClient(std::shared_ptr<Aws::S3::S3Client> client) :
s3_client_(std::move(client)) {}
+ ~S3ObjClient() override = default;
+
ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
std::string_view stream) override;
- ObjectStorageResponse upload_part(const ObjectStoragePathOptions& opts,
std::string_view,
- int partNum) override;
- ObjectStorageResponse complete_multipart_upload(
- const ObjectStoragePathOptions& opts,
- const ObjectCompleteMultiParts& completed_parts) override;
- ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions&
opts) override;
- ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts,
void* buffer,
- size_t offset, size_t bytes_read,
- size_t* size_return) override;
+ ObjectStorageResponse head_object(const ObjectStoragePathOptions& opts)
override;
ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts,
- std::vector<FileInfo>* files) override;
+ std::vector<ObjectMeta>* files)
override;
ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts,
std::vector<std::string> objs)
override;
ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts)
override;
ObjectStorageResponse delete_objects_recursively(const
ObjectStoragePathOptions& opts) override;
+ ObjectStorageResponse delete_expired(const
ObjectStorageDeleteExpiredOptions& opts,
+ int64_t expired_time) override;
+ ObjectStorageResponse get_life_cycle(const ObjectStoragePathOptions& opts,
+ int64_t* expiration_days) override;
+
+ ObjectStorageResponse check_versioning(const ObjectStoragePathOptions&
opts) override;
+
+ const std::shared_ptr<Aws::S3::S3Client>& s3_client() { return s3_client_;
}
private:
- std::shared_ptr<Aws::S3::S3Client> _client;
+ std::shared_ptr<Aws::S3::S3Client> s3_client_;
};
-} // namespace doris::io
+} // namespace doris::cloud
\ No newline at end of file
diff --git a/cloud/test/s3_accessor_test.cpp b/cloud/test/s3_accessor_test.cpp
index 972505c3999..362a0b11cbf 100644
--- a/cloud/test/s3_accessor_test.cpp
+++ b/cloud/test/s3_accessor_test.cpp
@@ -397,6 +397,7 @@ TEST(S3AccessorTest, check_bucket_versioning) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -422,6 +423,7 @@ TEST(S3AccessorTest, check_bucket_versioning_error) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
return_error_for_error_s3_client = true;
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
@@ -444,6 +446,7 @@ TEST(S3AccessorTest, get_bucket_lifecycle) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -473,6 +476,7 @@ TEST(S3AccessorTest, get_bucket_lifecycle_error) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
return_error_for_error_s3_client = true;
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
@@ -496,6 +500,7 @@ TEST(S3AccessorTest, list) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -519,6 +524,7 @@ TEST(S3AccessorTest, list_error) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
return_error_for_error_s3_client = true;
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
@@ -543,6 +549,7 @@ TEST(S3AccessorTest, put) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -569,6 +576,7 @@ TEST(S3AccessorTest, put_error) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -601,6 +609,7 @@ TEST(S3AccessorTest, exist) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -624,6 +633,7 @@ TEST(S3AccessorTest, exist_error) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -649,6 +659,7 @@ TEST(S3AccessorTest, delete_object) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -675,6 +686,7 @@ TEST(S3AccessorTest, gcs_delete_objects) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<GcsAccessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -707,6 +719,7 @@ TEST(S3AccessorTest, gcs_delete_objects_error) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<GcsAccessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -744,6 +757,7 @@ TEST(S3AccessorTest, delete_objects) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -776,6 +790,7 @@ TEST(S3AccessorTest, delete_objects_error) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -817,6 +832,7 @@ TEST(S3AccessorTest, delete_expired_objects) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -871,6 +887,7 @@ TEST(S3AccessorTest, delete_object_by_prefix) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client = std::make_unique<MockS3Client>();
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
@@ -898,6 +915,7 @@ TEST(S3AccessorTest, delete_object_by_prefix_error) {
_mock_fs = std::make_unique<cloud::MockS3Accessor>(cloud::S3Conf {});
_mock_client =
std::make_unique<MockS3Client>(std::make_unique<ErrorS3Client>());
auto accessor = std::make_unique<S3Accessor>(S3Conf {});
+ ASSERT_EQ(0, accessor->init());
auto sp = SyncPoint::get_instance();
std::for_each(callbacks.begin(), callbacks.end(), [&](const MockCallable&
mock_callback) {
sp->set_call_back(fmt::format("{}::pred", mock_callback.point_name),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]