This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 47c7238c038 branch-3.0: [opt](s3io) Check data integrity after an
upload for S3FileWriter #50168 (#50312)
47c7238c038 is described below
commit 47c7238c03823c37be54ff4ea28852f72124a7a2
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 29 15:14:28 2025 +0800
branch-3.0: [opt](s3io) Check data integrity after an upload for
S3FileWriter #50168 (#50312)
Cherry-picked from #50168
Co-authored-by: Gavin Chou <[email protected]>
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/io/fs/s3_file_writer.cpp | 61 +++++++++++++++++++++++++++++++++++
be/test/io/fs/s3_file_writer_test.cpp | 6 ++--
4 files changed, 66 insertions(+), 3 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index c8dcf517139..f5a56c23ebd 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1334,6 +1334,7 @@ DEFINE_mBool(force_azure_blob_global_endpoint, "false");
DEFINE_mInt32(max_s3_client_retry, "10");
DEFINE_mInt32(s3_read_base_wait_time_ms, "100");
DEFINE_mInt32(s3_read_max_wait_time_ms, "800");
+DEFINE_mBool(enable_s3_object_check_after_upload, "true");
DEFINE_mBool(enable_s3_rate_limiter, "false");
DEFINE_mInt64(s3_get_bucket_tokens, "1000000000000000000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d9a93c1f7f6..b2799622d5f 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1426,6 +1426,7 @@ DECLARE_mInt32(max_s3_client_retry);
// and the max retry time is max_s3_client_retry
DECLARE_mInt32(s3_read_base_wait_time_ms);
DECLARE_mInt32(s3_read_max_wait_time_ms);
+DECLARE_mBool(enable_s3_object_check_after_upload);
// write as inverted index tmp directory
DECLARE_String(tmp_file_dir);
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index d5a5cd00cea..5eabeea888d 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -313,6 +313,55 @@ void S3FileWriter::_upload_one_part(int64_t part_num,
UploadFileBuffer& buf) {
_completed_parts.emplace_back(std::move(completed_part));
}
+// if enabled check
+// 1. issue a head object request for existence check
+// 2. check the file size
+Status check_after_upload(ObjStorageClient* client, const
ObjectStorageResponse& upload_res,
+ const ObjectStoragePathOptions& path_opt, int64_t
bytes_appended,
+ const std::string& put_or_comp) {
+ if (!config::enable_s3_object_check_after_upload) return Status::OK();
+
+ auto head_res = client->head_object(path_opt);
+
+ // clang-format off
+ auto err_msg = [&]() {
+ std::stringstream ss;
+ ss << "failed to check object after upload=" << put_or_comp
+ << " file_path=" << path_opt.path.native()
+ << fmt::format(" {}_err=", put_or_comp) << upload_res.status.msg
+ << fmt::format(" {}_code=", put_or_comp) << upload_res.status.code
+ << fmt::format(" {}_http_code=", put_or_comp) <<
upload_res.http_code
+ << fmt::format(" {}_request_id=", put_or_comp) <<
upload_res.request_id
+ << " head_err=" << head_res.resp.status.msg
+ << " head_code=" << head_res.resp.status.code
+ << " head_http_code=" << head_res.resp.http_code
+ << " head_request_id=" << head_res.resp.request_id;
+ return ss.str();
+ };
+ // clang-format on
+
+ // TODO(gavin): make it fail by injection
+ TEST_SYNC_POINT_CALLBACK("S3FileWriter::check_after_load", &head_res);
+ if (head_res.resp.status.code != ErrorCode::OK && head_res.resp.http_code
!= 200) {
+ LOG(WARNING) << "failed to issue head object after upload, " <<
err_msg();
+ DCHECK(false) << "failed to issue head object after upload, " <<
err_msg();
+ // FIXME(gavin): we should retry if this HEAD fails?
+ return Status::IOError(
+ "failed to issue head object after upload, status_code={},
http_code={}, err={}",
+ head_res.resp.status.code, head_res.resp.http_code,
head_res.resp.status.msg);
+ }
+ if (head_res.file_size != bytes_appended) {
+ LOG(WARNING) << "failed to check size after upload, expected_size=" <<
bytes_appended
+ << " actual_size=" << head_res.file_size << err_msg();
+ DCHECK_EQ(bytes_appended, head_res.file_size)
+ << "failed to check size after upload," << err_msg();
+ return Status::IOError(
+ "failed to check object size after upload, expected_size={}
actual_size={}",
+ bytes_appended, head_res.file_size);
+ }
+ return Status::OK();
+}
+
Status S3FileWriter::_complete() {
const auto& client = _obj_client->get();
if (nullptr == client) {
@@ -368,6 +417,10 @@ Status S3FileWriter::_complete() {
_obj_storage_path_opts.path.native());
return {resp.status.code, std::move(resp.status.msg)};
}
+
+ RETURN_IF_ERROR(check_after_upload(client.get(), resp,
_obj_storage_path_opts, _bytes_appended,
+ "complete_multipart"));
+
s3_file_created_total << 1;
return Status::OK();
}
@@ -414,6 +467,14 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
buf.set_status({resp.status.code, std::move(resp.status.msg)});
return;
}
+
+ auto st = check_after_upload(client.get(), resp, _obj_storage_path_opts,
_bytes_appended,
+ "put_object");
+ if (!st.ok()) {
+ buf.set_status(st);
+ return;
+ }
+
s3_file_created_total << 1;
}
diff --git a/be/test/io/fs/s3_file_writer_test.cpp
b/be/test/io/fs/s3_file_writer_test.cpp
index 79af666f928..0662565ec6a 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -1150,8 +1150,8 @@ public:
}
ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions&
opts) override {
- last_opts = opts;
- return default_head_response;
+ return {.resp = ObjectStorageResponse::OK(),
+ .file_size =
static_cast<int64_t>(objects[opts.path.native()].size())};
}
ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts,
void* buffer,
@@ -1365,7 +1365,7 @@ std::string get_s3_path(std::string_view path) {
// put object
// create_multi_parts_upload + upload_part + complete_parts
-TEST_F(S3FileWriterTest, write_bufer_boundary) {
+TEST_F(S3FileWriterTest, write_buffer_boundary) {
// diable file cache to avoid write to cache
bool enable_file_cache = config::enable_file_cache;
config::enable_file_cache = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]