This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bc4e0e97f2 [enhance](S3FileWriter) abort when s3 file writer
abnormally quite and optimize s3 buffer pool (#19944)
bc4e0e97f2 is described below
commit bc4e0e97f2473df488a6a00d3806d7bde7ee380b
Author: AlexYue <[email protected]>
AuthorDate: Fri May 26 09:14:38 2023 +0800
[enhance](S3FileWriter) abort when s3 file writer abnormally quite and
optimize s3 buffer pool (#19944)
1. reduce s3 buffer pool's ctor cost
2. before this pr, if one s3 file writer return err when calling append or
close function, the caller will not call abort function which result in one
confusing DCHECK failed like the following picture
---
be/src/io/fs/s3_file_write_bufferpool.cpp | 37 ++++++++++++++-----------------
be/src/io/fs/s3_file_write_bufferpool.h | 22 +++++++++---------
be/src/io/fs/s3_file_writer.cpp | 36 +++++++++++++++++-------------
be/src/io/fs/s3_file_writer.h | 2 +-
4 files changed, 50 insertions(+), 47 deletions(-)
diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp
b/be/src/io/fs/s3_file_write_bufferpool.cpp
index 56437c2a22..423068f387 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.cpp
+++ b/be/src/io/fs/s3_file_write_bufferpool.cpp
@@ -28,11 +28,12 @@
namespace doris {
namespace io {
void S3FileBuffer::on_finished() {
- if (_buf == nullptr) {
+ if (_buf.empty()) {
return;
}
reset();
- S3FileBufferPool::GetInstance()->reclaim(shared_from_this());
+ S3FileBufferPool::GetInstance()->reclaim(_buf);
+ _buf.clear();
}
// when there is memory preserved, directly write data to buf
@@ -42,8 +43,8 @@ void S3FileBuffer::append_data(const Slice& data) {
Defer defer {[&] { _size += data.get_size(); }};
while (true) {
// if buf is not empty, it means there is memory preserved for this buf
- if (_buf != nullptr) {
- memcpy(_buf->data() + _size, data.get_data(), data.get_size());
+ if (!_buf.empty()) {
+ memcpy(_buf.data + _size, data.get_data(), data.get_size());
break;
} else {
// wait allocate buffer pool
@@ -54,8 +55,8 @@ void S3FileBuffer::append_data(const Slice& data) {
}
void S3FileBuffer::submit() {
- if (LIKELY(_buf != nullptr)) {
- _stream_ptr = std::make_shared<StringViewStream>(_buf->data(), _size);
+ if (LIKELY(!_buf.empty())) {
+ _stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size);
}
ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
@@ -69,37 +70,33 @@ S3FileBufferPool::S3FileBufferPool() {
(config::s3_write_buffer_whole_size >
config::s3_write_buffer_size));
LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
for (size_t i = 0; i < buf_num; i++) {
- auto buf = std::make_shared<S3FileBuffer>();
- buf->reserve_buffer();
- _free_buffers.emplace_back(std::move(buf));
+ Slice s {_whole_mem_buffer.get() + i * config::s3_write_buffer_size,
+ static_cast<size_t>(config::s3_write_buffer_size)};
+ _free_raw_buffers.emplace_back(s);
}
}
std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
- std::shared_ptr<S3FileBuffer> buf;
+ std::shared_ptr<S3FileBuffer> buf = std::make_shared<S3FileBuffer>();
// if need reserve then we must ensure return buf with memory preserved
if (reserve) {
{
std::unique_lock<std::mutex> lck {_lock};
- _cv.wait(lck, [this]() { return !_free_buffers.empty(); });
- buf = std::move(_free_buffers.front());
- _free_buffers.pop_front();
+ _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
+ buf->reserve_buffer(_free_raw_buffers.front());
+ _free_raw_buffers.pop_front();
}
return buf;
}
// try to get one memory reserved buffer
{
std::unique_lock<std::mutex> lck {_lock};
- if (!_free_buffers.empty()) {
- buf = std::move(_free_buffers.front());
- _free_buffers.pop_front();
+ if (!_free_raw_buffers.empty()) {
+ buf->reserve_buffer(_free_raw_buffers.front());
+ _free_raw_buffers.pop_front();
}
}
- if (buf != nullptr) {
- return buf;
- }
// if there is no free buffer and no need to reserve memory, we could
return one empty buffer
- buf = std::make_shared<S3FileBuffer>();
// if the buf has no memory reserved, it would try to write the data to
file cache first
// or it would try to rob buffer from other S3FileBuffer
return buf;
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h
b/be/src/io/fs/s3_file_write_bufferpool.h
index b7b1b9f261..b69964b48e 100644
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ b/be/src/io/fs/s3_file_write_bufferpool.h
@@ -33,7 +33,7 @@
namespace doris {
namespace io {
-// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read
buffer 3. decouple reserved memory and Callbacks
+// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read
buffer
struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
using Callback = std::function<void()>;
@@ -41,14 +41,13 @@ struct S3FileBuffer : public
std::enable_shared_from_this<S3FileBuffer> {
~S3FileBuffer() = default;
void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
- _buf = std::move(other->_buf);
- other->_buf = nullptr;
+ _buf = other->_buf;
+ // we should clear other's memory buffer in case it woule be reclaimed
twice
+ // when calling on_finished
+ other->_buf.clear();
}
- void reserve_buffer() {
- _buf = std::make_unique<std::string>();
- _buf->resize(config::s3_write_buffer_size);
- }
+ void reserve_buffer(Slice s) { _buf = s; }
// apend data into the memory buffer inside or into the file cache
// if the buffer has no memory buffer
@@ -109,7 +108,7 @@ struct S3FileBuffer : public
std::enable_shared_from_this<S3FileBuffer> {
size_t _size;
std::shared_ptr<std::iostream> _stream_ptr;
// only served as one reserved buffer
- std::unique_ptr<std::string> _buf;
+ Slice _buf;
size_t _append_offset {0};
};
@@ -123,9 +122,9 @@ public:
return &_pool;
}
- void reclaim(std::shared_ptr<S3FileBuffer> buf) {
+ void reclaim(Slice buf) {
std::unique_lock<std::mutex> lck {_lock};
- _free_buffers.emplace_front(std::move(buf));
+ _free_raw_buffers.emplace_front(buf);
_cv.notify_all();
}
@@ -134,7 +133,8 @@ public:
private:
std::mutex _lock;
std::condition_variable _cv;
- std::list<std::shared_ptr<S3FileBuffer>> _free_buffers;
+ std::unique_ptr<char[]> _whole_mem_buffer;
+ std::list<Slice> _free_raw_buffers;
};
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index e6eab1a21a..d31a144543 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -87,11 +87,15 @@ S3FileWriter::S3FileWriter(Path path,
std::shared_ptr<S3Client> client, const S3
}
S3FileWriter::~S3FileWriter() {
- if (_opened) {
- close();
+ if (!_closed) {
+ // if we don't abort multi part upload, the uploaded part in object
+ // store will not automatically reclaim itself, it would cost more
money
+ abort();
}
- CHECK(!_opened || _closed) << "open: " << _opened << ", closed: " <<
_closed;
+ CHECK(_closed) << ", closed: " << _closed;
// in case there are task which might run after this object is destroyed
+ // for example, if the whole task failed and some task are still pending
+ // in threadpool
_wait_until_finish("dtor");
s3_file_being_written << -1;
}
@@ -121,9 +125,10 @@ void S3FileWriter::_wait_until_finish(std::string
task_name) {
}
Status S3FileWriter::abort() {
+ // make all pending work early quits
_failed = true;
- if (_closed || !_opened) {
- _wait_until_finish("Abort");
+ _closed = true;
+ if (_aborted) {
return Status::OK();
}
// we need to reclaim the memory
@@ -137,7 +142,6 @@ Status S3FileWriter::abort() {
}
VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
_wait_until_finish("Abort");
- _closed = true;
AbortMultipartUploadRequest request;
request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
auto outcome = _client->AbortMultipartUpload(request);
@@ -147,6 +151,7 @@ Status S3FileWriter::abort() {
LOG(INFO) << "Abort multipart upload successfully"
<< "bucket=" << _bucket << ", key=" << _path.native()
<< ", upload_id=" << _upload_id;
+ _aborted = true;
return Status::OK();
}
return Status::IOError("failed to abort multipart upload(bucket={},
key={}, upload_id={}): {}",
@@ -154,11 +159,17 @@ Status S3FileWriter::abort() {
}
Status S3FileWriter::close() {
- if (_closed || _failed) {
+ if (_closed) {
_wait_until_finish("close");
return _st;
}
+ _closed = true;
+ if (_failed) {
+ abort();
+ return _st;
+ }
VLOG_DEBUG << "S3FileWriter::close, path: " << _path.native();
+ // it might be one file less than 5MB, we do upload here
if (_pending_buf != nullptr) {
if (_upload_id.empty()) {
_pending_buf->set_upload_remote_callback(
@@ -169,18 +180,11 @@ Status S3FileWriter::close() {
_pending_buf = nullptr;
}
RETURN_IF_ERROR(_complete());
- _closed = true;
return Status::OK();
}
Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
- // lazy open
- if (!_opened) {
- VLOG_DEBUG << "S3FileWriter::open, path: " << _path.native();
- _closed = false;
- _opened = true;
- }
DCHECK(!_closed);
size_t buffer_size = config::s3_write_buffer_size;
SCOPED_RAW_TIMER(_upload_cost_ms.get());
@@ -339,7 +343,7 @@ Status S3FileWriter::finalize() {
}
void S3FileWriter::_put_object(S3FileBuffer& buf) {
- DCHECK(!_closed && _opened) << "closed " << _closed << " opened " <<
_opened;
+ DCHECK(!_closed) << "closed " << _closed;
Aws::S3::Model::PutObjectRequest request;
request.WithBucket(_bucket).WithKey(_key);
request.SetBody(buf.get_stream());
@@ -351,6 +355,8 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
response.GetError().GetExceptionName(),
response.GetError().GetMessage(),
static_cast<int>(response.GetError().GetResponseCode()));
+ buf._on_failed(_st);
+ LOG(WARNING) << _st;
}
s3_bytes_written_total << buf.get_size();
s3_file_created_total << 1;
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 8f04fbc0c3..26b77c42f8 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -109,7 +109,7 @@ private:
std::string _bucket;
std::string _key;
bool _closed = true;
- bool _opened = false;
+ bool _aborted = false;
std::unique_ptr<int64_t> _upload_cost_ms;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]