This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 59ebbb351ee [feature](merge-cloud) Enable write into cache when 
uploading file to s3 using s3 file writer (#24364)
59ebbb351ee is described below

commit 59ebbb351ee9164062147280a32543368d867126
Author: AlexYue <[email protected]>
AuthorDate: Mon Oct 16 21:31:02 2023 +0800

    [feature](merge-cloud) Enable write into cache when uploading file to s3 
using s3 file writer (#24364)
---
 be/src/common/config.cpp                       |   2 +
 be/src/common/config.h                         |   2 +
 be/src/io/cache/block/block_file_cache.cpp     |   8 +-
 be/src/io/cache/block/block_file_cache.h       |  20 +-
 be/src/io/cache/block/block_file_segment.cpp   |  30 ++
 be/src/io/cache/block/block_file_segment.h     |   4 +
 be/src/io/cache/block/block_lru_file_cache.cpp |  17 +
 be/src/io/cache/block/block_lru_file_cache.h   |   3 +
 be/src/io/fs/benchmark/fs_benchmark_tool.cpp   |  10 +-
 be/src/io/fs/s3_file_bufferpool.cpp            | 336 +++++++++++++++++
 be/src/io/fs/s3_file_bufferpool.h              | 356 ++++++++++++++++++
 be/src/io/fs/s3_file_write_bufferpool.cpp      | 109 ------
 be/src/io/fs/s3_file_write_bufferpool.h        | 150 --------
 be/src/io/fs/s3_file_writer.cpp                | 168 +++++++--
 be/src/io/fs/s3_file_writer.h                  |  15 +-
 be/src/io/io_common.h                          |   2 +
 be/src/runtime/exec_env.h                      |   3 +
 be/src/runtime/exec_env_init.cpp               |  11 +-
 be/src/service/doris_main.cpp                  |   1 -
 be/test/io/fs/remote_file_system_test.cpp      |   4 +-
 be/test/io/fs/s3_file_writer_test.cpp          | 479 +++++++++++++++++++++++++
 21 files changed, 1411 insertions(+), 319 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 85344697772..852aba47856 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1105,6 +1105,8 @@ DEFINE_mInt32(scan_thread_nice_value, "0");
 DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");
 
 DEFINE_Bool(exit_on_exception, "false");
+// This config controls whether the s3 file writer would flush cache 
asynchronously
+DEFINE_Bool(enable_flush_file_cache_async, "true");
 
 // cgroup
 DEFINE_String(doris_cgroup_cpu_path, "");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4207b354410..caca2255cb6 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1178,6 +1178,8 @@ DECLARE_mBool(exit_on_exception);
 // cgroup
 DECLARE_String(doris_cgroup_cpu_path);
 DECLARE_Bool(enable_cpu_hard_limit);
+// This config controls whether the s3 file writer would flush cache 
asynchronously
+DECLARE_Bool(enable_flush_file_cache_async);
 
 // Remove predicate that is always true for a segment.
 DECLARE_Bool(ignore_always_true_predicate_for_segment);
diff --git a/be/src/io/cache/block/block_file_cache.cpp 
b/be/src/io/cache/block/block_file_cache.cpp
index 2b7b8cb1343..54df255abe5 100644
--- a/be/src/io/cache/block/block_file_cache.cpp
+++ b/be/src/io/cache/block/block_file_cache.cpp
@@ -20,6 +20,7 @@
 
 #include "io/cache/block/block_file_cache.h"
 
+#include <fmt/core.h>
 #include <glog/logging.h>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <sys/resource.h>
@@ -57,7 +58,7 @@ IFileCache::Key IFileCache::hash(const std::string& path) {
     return Key(key);
 }
 
-std::string IFileCache::cache_type_to_string(CacheType type) {
+std::string_view IFileCache::cache_type_to_string(CacheType type) {
     switch (type) {
     case CacheType::INDEX:
         return "_idx";
@@ -65,6 +66,8 @@ std::string IFileCache::cache_type_to_string(CacheType type) {
         return "_disposable";
     case CacheType::NORMAL:
         return "";
+    case CacheType::TTL:
+        return "_ttl";
     }
     return "";
 }
@@ -83,8 +86,7 @@ CacheType IFileCache::string_to_cache_type(const std::string& 
str) {
 
 std::string IFileCache::get_path_in_local_cache(const Key& key, size_t offset,
                                                 CacheType type) const {
-    return get_path_in_local_cache(key) + "/" +
-           (std::to_string(offset) + cache_type_to_string(type));
+    return fmt::format("{}/{}{}", get_path_in_local_cache(key), offset, 
cache_type_to_string(type));
 }
 
 std::string IFileCache::get_path_in_local_cache(const Key& key) const {
diff --git a/be/src/io/cache/block/block_file_cache.h 
b/be/src/io/cache/block/block_file_cache.h
index e58e873af03..8c2a34c34f9 100644
--- a/be/src/io/cache/block/block_file_cache.h
+++ b/be/src/io/cache/block/block_file_cache.h
@@ -40,6 +40,7 @@
 #include "io/fs/file_reader.h"
 #include "io/io_common.h"
 #include "util/hash_util.hpp"
+#include "util/lock.h"
 #include "vec/common/uint128.h"
 
 namespace doris {
@@ -54,22 +55,28 @@ enum CacheType {
     INDEX,
     NORMAL,
     DISPOSABLE,
+    TTL,
 };
 
 struct CacheContext {
-    CacheContext(const IOContext* io_ctx) {
-        if (io_ctx->is_index_data) {
+    CacheContext(const IOContext* io_context) {
+        if (io_context->is_index_data) {
             cache_type = CacheType::INDEX;
-        } else if (io_ctx->is_disposable) {
+        } else if (io_context->is_disposable) {
             cache_type = CacheType::DISPOSABLE;
+        } else if (io_context->expiration_time != 0) {
+            cache_type = CacheType::TTL;
+            expiration_time = io_context->expiration_time;
         } else {
             cache_type = CacheType::NORMAL;
         }
-        query_id = io_ctx->query_id ? *io_ctx->query_id : TUniqueId();
+        query_id = io_context->query_id ? *io_context->query_id : TUniqueId();
     }
     CacheContext() = default;
     TUniqueId query_id;
     CacheType cache_type;
+    int64_t expiration_time {0};
+    bool is_cold_data {false};
 };
 
 /**
@@ -139,7 +146,10 @@ public:
 
     virtual size_t get_file_segments_num(CacheType type) const = 0;
 
-    static std::string cache_type_to_string(CacheType type);
+    virtual void change_cache_type(const Key& key, size_t offset, CacheType 
new_type,
+                                   std::lock_guard<doris::Mutex>& cache_lock) 
= 0;
+
+    static std::string_view cache_type_to_string(CacheType type);
     static CacheType string_to_cache_type(const std::string& str);
 
     IFileCache& operator=(const IFileCache&) = delete;
diff --git a/be/src/io/cache/block/block_file_segment.cpp 
b/be/src/io/cache/block/block_file_segment.cpp
index 3b179de343d..4e6b09908cd 100644
--- a/be/src/io/cache/block/block_file_segment.cpp
+++ b/be/src/io/cache/block/block_file_segment.cpp
@@ -196,6 +196,36 @@ Status FileBlock::read_at(Slice buffer, size_t 
read_offset) {
     return st;
 }
 
+bool FileBlock::change_cache_type(CacheType new_type) {
+    std::unique_lock segment_lock(_mutex);
+    if (new_type == _cache_type) {
+        return true;
+    }
+    if (_download_state == State::DOWNLOADED) {
+        std::error_code ec;
+        std::filesystem::rename(get_path_in_local_cache(),
+                                _cache->get_path_in_local_cache(key(), 
offset(), new_type), ec);
+        if (ec) {
+            LOG(ERROR) << "change cache type failed due to rename error " << 
ec.message();
+            return false;
+        }
+    }
+    _cache_type = new_type;
+    return true;
+}
+
+Status FileBlock::change_cache_type_self(CacheType new_type) {
+    std::lock_guard cache_lock(_cache->_mutex);
+    std::unique_lock segment_lock(_mutex);
+    Status st = Status::OK();
+    if (_cache_type == CacheType::TTL || new_type == _cache_type) {
+        return st;
+    }
+    _cache_type = new_type;
+    _cache->change_cache_type(_file_key, _segment_range.left, new_type, 
cache_lock);
+    return st;
+}
+
 Status FileBlock::finalize_write() {
     std::lock_guard segment_lock(_mutex);
 
diff --git a/be/src/io/cache/block/block_file_segment.h 
b/be/src/io/cache/block/block_file_segment.h
index b462259931c..24a4e6e174e 100644
--- a/be/src/io/cache/block/block_file_segment.h
+++ b/be/src/io/cache/block/block_file_segment.h
@@ -138,6 +138,10 @@ public:
 
     std::string get_path_in_local_cache() const;
 
+    bool change_cache_type(CacheType new_type);
+
+    Status change_cache_type_self(CacheType new_type);
+
     State state_unlock(std::lock_guard<std::mutex>&) const;
 
     FileBlock& operator=(const FileBlock&) = delete;
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp 
b/be/src/io/cache/block/block_lru_file_cache.cpp
index 03a291ba74d..0578c82e34e 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -1005,6 +1005,23 @@ size_t 
LRUFileCache::get_file_segments_num_unlocked(CacheType cache_type,
     return get_queue(cache_type).get_elements_num(cache_lock);
 }
 
+void LRUFileCache::change_cache_type(const IFileCache::Key& key, size_t 
offset, CacheType new_type,
+                                     std::lock_guard<doris::Mutex>& 
cache_lock) {
+    if (auto iter = _files.find(key); iter != _files.end()) {
+        auto& file_blocks = iter->second;
+        if (auto cell_it = file_blocks.find(offset); cell_it != 
file_blocks.end()) {
+            FileBlockCell& cell = cell_it->second;
+            auto& cur_queue = get_queue(cell.cache_type);
+            cell.cache_type = new_type;
+            DCHECK(cell.queue_iterator.has_value());
+            cur_queue.remove(*cell.queue_iterator, cache_lock);
+            auto& new_queue = get_queue(new_type);
+            cell.queue_iterator =
+                    new_queue.add(key, offset, 
cell.file_block->range().size(), cache_lock);
+        }
+    }
+}
+
 LRUFileCache::FileBlockCell::FileBlockCell(FileBlockSPtr file_block, CacheType 
cache_type,
                                            std::lock_guard<std::mutex>& 
cache_lock)
         : file_block(file_block), cache_type(cache_type) {
diff --git a/be/src/io/cache/block/block_lru_file_cache.h 
b/be/src/io/cache/block/block_lru_file_cache.h
index 5a15b10ba2d..b75218c09e2 100644
--- a/be/src/io/cache/block/block_lru_file_cache.h
+++ b/be/src/io/cache/block/block_lru_file_cache.h
@@ -158,6 +158,9 @@ private:
     void remove(FileBlockSPtr file_block, std::lock_guard<std::mutex>& 
cache_lock,
                 std::lock_guard<std::mutex>& segment_lock) override;
 
+    void change_cache_type(const Key& key, size_t offset, CacheType new_type,
+                           std::lock_guard<doris::Mutex>& cache_lock) override;
+
     size_t get_available_cache_size(CacheType cache_type) const;
 
     Status load_cache_info_into_memory(std::lock_guard<std::mutex>& 
cache_lock);
diff --git a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp 
b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
index 2a6ad2db3d4..0ca9edc530a 100644
--- a/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
+++ b/be/src/io/fs/benchmark/fs_benchmark_tool.cpp
@@ -20,7 +20,7 @@
 #include <fstream>
 
 #include "io/fs/benchmark/benchmark_factory.hpp"
-#include "io/fs/s3_file_write_bufferpool.h"
+#include "io/fs/s3_file_bufferpool.h"
 #include "util/cpu_info.h"
 #include "util/threadpool.h"
 
@@ -114,13 +114,13 @@ int main(int argc, char** argv) {
     int num_cores = doris::CpuInfo::num_cores();
 
     // init s3 write buffer pool
-    std::unique_ptr<doris::ThreadPool> buffered_reader_prefetch_thread_pool;
-    
static_cast<void>(doris::ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
+    std::unique_ptr<doris::ThreadPool> s3_file_upload_thread_pool;
+    static_cast<void>(doris::ThreadPoolBuilder("S3FileUploadThreadPool")
                               .set_min_threads(num_cores)
                               .set_max_threads(num_cores)
-                              .build(&buffered_reader_prefetch_thread_pool));
+                              .build(&s3_file_upload_thread_pool));
     doris::io::S3FileBufferPool* s3_buffer_pool = 
doris::io::S3FileBufferPool::GetInstance();
-    s3_buffer_pool->init(524288000, 5242880, 
buffered_reader_prefetch_thread_pool.get());
+    s3_buffer_pool->init(524288000, 5242880, s3_file_upload_thread_pool.get());
 
     try {
         doris::io::MultiBenchmark multi_bm(FLAGS_fs_type, FLAGS_operation, 
std::stoi(FLAGS_threads),
diff --git a/be/src/io/fs/s3_file_bufferpool.cpp 
b/be/src/io/fs/s3_file_bufferpool.cpp
new file mode 100644
index 00000000000..b421bfb1f71
--- /dev/null
+++ b/be/src/io/fs/s3_file_bufferpool.cpp
@@ -0,0 +1,336 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "s3_file_bufferpool.h"
+
+#include "common/config.h"
+#include "common/logging.h"
+#include "io/cache/block/block_file_segment.h"
+#include "io/fs/s3_common.h"
+#include "runtime/exec_env.h"
+#include "util/defer_op.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace io {
+
+/**
+ * 0. check if the inner memory buffer is empty or not
+ * 1. relcaim the memory buffer if it's mot empty
+ */
+void FileBuffer::on_finish() {
+    if (_buffer.empty()) {
+        return;
+    }
+    S3FileBufferPool::GetInstance()->reclaim(Slice {_buffer.get_data(), 
_capacity});
+    _buffer.clear();
+}
+
+/**
+ * take other buffer's memory space and refresh capacity
+ */
+void FileBuffer::swap_buffer(Slice& other) {
+    _buffer = other;
+    _capacity = _buffer.get_size();
+    other.clear();
+}
+
+FileBuffer::FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, 
size_t offset,
+                       OperationState state, bool reserve)
+        : _alloc_holder(std::move(alloc_holder)),
+          _buffer(S3FileBufferPool::GetInstance()->allocate(reserve)),
+          _offset(offset),
+          _size(0),
+          _state(std::move(state)),
+          _capacity(_buffer.get_size()) {}
+
+/**
+ * 0. check if file cache holder allocated
+ * 1. update the cache's type to index cache
+ */
+void UploadFileBuffer::set_index_offset(size_t offset) {
+    _index_offset = offset;
+    if (_holder) {
+        bool change_to_index_cache = false;
+        for (auto iter = _holder->file_segments.begin(); iter != 
_holder->file_segments.end();
+             ++iter) {
+            if (iter == _cur_file_segment) {
+                change_to_index_cache = true;
+            }
+            if (change_to_index_cache) {
+                
static_cast<void>((*iter)->change_cache_type_self(CacheType::INDEX));
+            }
+        }
+    }
+}
+
+/**
+ * 0. when there is memory preserved, directly write data to buf
+ * 1. write to file cache otherwise, then we'll wait for free buffer and to 
rob it
+ */
+Status UploadFileBuffer::append_data(const Slice& data) {
+    Defer defer {[&] { _size += data.get_size(); }};
+    while (true) {
+        // if buf is not empty, it means there is memory preserved for this buf
+        if (!_buffer.empty()) {
+            std::memcpy((void*)(_buffer.get_data() + _size), data.get_data(), 
data.get_size());
+            break;
+        }
+        // if the buf has no memory reserved, then write to disk first
+        if (!_is_cache_allocated && config::enable_file_cache && _alloc_holder 
!= nullptr) {
+            _holder = _alloc_holder();
+            bool cache_is_not_enough = false;
+            for (auto& segment : _holder->file_segments) {
+                DCHECK(segment->state() == FileBlock::State::SKIP_CACHE ||
+                       segment->state() == FileBlock::State::EMPTY);
+                if (segment->state() == FileBlock::State::SKIP_CACHE) 
[[unlikely]] {
+                    cache_is_not_enough = true;
+                    break;
+                }
+                if (_index_offset != 0) {
+                    
static_cast<void>(segment->change_cache_type_self(CacheType::INDEX));
+                }
+            }
+            // if cache_is_not_enough, cannot use it !
+            _cur_file_segment = _holder->file_segments.begin();
+            _append_offset = (*_cur_file_segment)->range().left;
+            _holder = cache_is_not_enough ? nullptr : std::move(_holder);
+            if (_holder) {
+                (*_cur_file_segment)->get_or_set_downloader();
+            }
+            _is_cache_allocated = true;
+        }
+        if (_holder) [[likely]] {
+            size_t data_remain_size = data.get_size();
+            size_t pos = 0;
+            while (data_remain_size != 0) {
+                auto range = (*_cur_file_segment)->range();
+                size_t segment_remain_size = range.right - _append_offset + 1;
+                size_t append_size = std::min(data_remain_size, 
segment_remain_size);
+                Slice append_data(data.get_data() + pos, append_size);
+                // When there is no available free memory buffer, the data 
will be written to the cache first
+                // and then uploaded to S3 when there is an available free 
memory buffer.
+                // However, if an error occurs during the write process to the 
local cache,
+                // continuing to upload the dirty data from the cache to S3 
will result in erroneous data(Bad segment).
+                // Considering that local disk write failures are rare, a 
simple approach is chosen here,
+                // which is to treat the import as a failure directly when a 
local write failure occurs
+                RETURN_IF_ERROR((*_cur_file_segment)->append(append_data));
+                if (segment_remain_size == append_size) {
+                    RETURN_IF_ERROR((*_cur_file_segment)->finalize_write());
+                    if (++_cur_file_segment != _holder->file_segments.end()) {
+                        (*_cur_file_segment)->get_or_set_downloader();
+                    }
+                }
+                data_remain_size -= append_size;
+                _append_offset += append_size;
+                pos += append_size;
+            }
+            break;
+        } else {
+            // wait allocate buffer pool
+            auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+            swap_buffer(tmp);
+        }
+    }
+    return Status::OK();
+}
+
+/**
+ * 0. allocate one memory buffer
+ * 1. read the content from the cache and then write
+ * it into memory buffer
+ */
+void UploadFileBuffer::read_from_cache() {
+    auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
+    swap_buffer(tmp);
+
+    DCHECK(_holder != nullptr);
+    DCHECK(_capacity >= _size);
+    size_t pos = 0;
+    for (auto& segment : _holder->file_segments) {
+        if (pos == _size) {
+            break;
+        }
+        if (auto s = segment->finalize_write(); !s.ok()) [[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+        size_t segment_size = segment->range().size();
+        Slice s(_buffer.get_data() + pos, segment_size);
+        if (auto st = segment->read_at(s, 0); !st.ok()) [[unlikely]] {
+            set_val(std::move(st));
+            return;
+        }
+        pos += segment_size;
+    }
+
+    // the real lenght should be the buf.get_size() in this situation(consider 
it's the last part,
+    // size of it could be less than 5MB)
+    _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+}
+
+/**
+ * 0. constrcut the stream ptr if the buffer is not empty
+ * 1. submit the on_upload() callback to executor
+ */
+void UploadFileBuffer::submit() {
+    if (!_buffer.empty()) [[likely]] {
+        _stream_ptr = std::make_shared<StringViewStream>(_buffer.get_data(), 
_size);
+    }
+    // If the data is written into file cache
+    if (_holder && _cur_file_segment != _holder->file_segments.end()) {
+        if (auto s = (*_cur_file_segment)->finalize_write(); !s.ok()) 
[[unlikely]] {
+            set_val(std::move(s));
+            return;
+        }
+    }
+    
static_cast<void>(S3FileBufferPool::GetInstance()->thread_pool()->submit_func(
+            [buf = this->shared_from_this(), this]() {
+                // to extend buf's lifetime
+                // (void)buf;
+                on_upload();
+            }));
+}
+
+/**
+ * write the content of the memory buffer to local file cache
+ */
+void UploadFileBuffer::upload_to_local_file_cache(bool is_cancelled) {
+    if (!config::enable_file_cache || _alloc_holder == nullptr) {
+        return;
+    }
+    if (_holder) {
+        return;
+    }
+    if (is_cancelled) {
+        return;
+    }
+    // the data is already written to S3 in this situation
+    // so i didn't handle the file cache write error
+    _holder = _alloc_holder();
+    size_t pos = 0;
+    size_t data_remain_size = _size;
+    for (auto& segment : _holder->file_segments) {
+        if (data_remain_size == 0) {
+            break;
+        }
+        size_t segment_size = segment->range().size();
+        size_t append_size = std::min(data_remain_size, segment_size);
+        if (segment->state() == FileBlock::State::EMPTY) {
+            if (_index_offset != 0 && segment->range().right >= _index_offset) 
{
+                // segment->change_cache_type_self(CacheType::INDEX);
+            }
+            segment->get_or_set_downloader();
+            // Another thread may have started downloading due to a query
+            // Just skip putting to cache from UploadFileBuffer
+            if (segment->is_downloader()) {
+                Slice s(_buffer.get_data() + pos, append_size);
+                if (auto st = segment->append(s); !st.ok()) [[unlikely]] {
+                    LOG_WARNING("append data to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+                if (auto st = segment->finalize_write(); !st.ok()) 
[[unlikely]] {
+                    LOG_WARNING("finalize write to cache segmetn failed due to 
{}", st);
+                    return;
+                }
+            }
+        }
+        data_remain_size -= append_size;
+        pos += append_size;
+    }
+}
+
+FileBufferBuilder& FileBufferBuilder::set_type(BufferType type) {
+    _type = type;
+    return *this;
+}
+FileBufferBuilder& FileBufferBuilder::set_upload_callback(
+        std::function<void(UploadFileBuffer& buf)> cb) {
+    _upload_cb = std::move(cb);
+    return *this;
+}
+// set callback to do task sync for the caller
+FileBufferBuilder& 
FileBufferBuilder::set_sync_after_complete_task(std::function<bool(Status)> cb) 
{
+    _sync_after_complete_task = std::move(cb);
+    return *this;
+}
+
+FileBufferBuilder& FileBufferBuilder::set_allocate_file_segments_holder(
+        std::function<FileBlocksHolderPtr()> cb) {
+    _alloc_holder_cb = std::move(cb);
+    return *this;
+}
+
+std::shared_ptr<FileBuffer> FileBufferBuilder::build() {
+    OperationState state(_sync_after_complete_task, _is_cancelled);
+    if (_type == BufferType::UPLOAD) {
+        return std::make_shared<UploadFileBuffer>(std::move(_upload_cb), 
std::move(state), _offset,
+                                                  std::move(_alloc_holder_cb), 
_index_offset);
+    }
+    // should never come here
+    return nullptr;
+}
+
+void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t 
s3_write_buffer_size,
+                            ThreadPool* thread_pool) {
+    // the nums could be one configuration
+    size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
+    DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
+           (s3_write_buffer_whole_size > s3_write_buffer_size))
+            << "s3 write buffer size " << s3_write_buffer_size << " whole s3 
write buffer size "
+            << s3_write_buffer_whole_size;
+    LOG_INFO("S3 file buffer pool with {} buffers, each with {}", buf_num, 
s3_write_buffer_size);
+    _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
+    for (size_t i = 0; i < buf_num; i++) {
+        Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
+                 static_cast<size_t>(s3_write_buffer_size)};
+        _free_raw_buffers.emplace_back(s);
+    }
+    _thread_pool = thread_pool;
+}
+
+Slice S3FileBufferPool::allocate(bool reserve) {
+    Slice buf;
+    // if need reserve or no cache then we must ensure return buf with memory 
preserved
+    if (reserve || !config::enable_file_cache) {
+        {
+            std::unique_lock<std::mutex> lck {_lock};
+            _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
+            buf = _free_raw_buffers.front();
+            _free_raw_buffers.pop_front();
+        }
+        return buf;
+    }
+    // try to get one memory reserved buffer
+    {
+        std::unique_lock<std::mutex> lck {_lock};
+        if (!_free_raw_buffers.empty()) {
+            buf = _free_raw_buffers.front();
+            _free_raw_buffers.pop_front();
+        }
+    }
+    if (!buf.empty()) {
+        return buf;
+    }
+    // if there is no free buffer and no need to reserve memory, we could 
return one empty buffer
+    buf = Slice();
+    // if the buf has no memory reserved, it would try to write the data to 
file cache first
+    // or it would try to rob buffer from other S3FileBuffer
+    return buf;
+}
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/s3_file_bufferpool.h 
b/be/src/io/fs/s3_file_bufferpool.h
new file mode 100644
index 00000000000..8dd61aac4de
--- /dev/null
+++ b/be/src/io/fs/s3_file_bufferpool.h
@@ -0,0 +1,356 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <cstdint>
+#include <fstream>
+#include <functional>
+#include <list>
+#include <memory>
+#include <mutex>
+
+#include "common/status.h"
+#include "io/cache/block/block_file_segment.h"
+#include "runtime/exec_env.h"
+#include "util/slice.h"
+#include "util/threadpool.h"
+
+namespace doris {
+namespace io {
+enum class BufferType { DOWNLOAD, UPLOAD };
+using FileBlocksHolderPtr = std::unique_ptr<FileBlocksHolder>;
+struct OperationState {
+    OperationState(std::function<bool(Status)> sync_after_complete_task,
+                   std::function<bool()> is_cancelled)
+            : _sync_after_complete_task(std::move(sync_after_complete_task)),
+              _is_cancelled(std::move(is_cancelled)) {}
+    /**
+    * set the val of this operation state which indicates it failed or 
succeeded
+    *
+    * @param S the execution result
+    */
+    void set_val(Status s = Status::OK()) {
+        // make sure we wouldn't sync twice
+        if (_value_set) [[unlikely]] {
+            return;
+        }
+        if (nullptr != _sync_after_complete_task) {
+            _fail_after_sync = _sync_after_complete_task(s);
+        }
+        _value_set = true;
+    }
+
+    /**
+    * detect whether the execution task is done
+    *
+    * @return is the execution task is done
+    */
+    [[nodiscard]] bool is_cancelled() const {
+        DCHECK(nullptr != _is_cancelled);
+        // If _fail_after_sync is true then it means the sync task already 
returns
+        // that the task failed and if the outside file writer might already be
+        // destructed
+        return _fail_after_sync ? true : _is_cancelled();
+    }
+
+    std::function<bool(Status)> _sync_after_complete_task;
+    std::function<bool()> _is_cancelled;
+    bool _value_set = false;
+    bool _fail_after_sync = false;
+};
+
+struct FileBuffer : public std::enable_shared_from_this<FileBuffer> {
+    FileBuffer(std::function<FileBlocksHolderPtr()> alloc_holder, size_t 
offset,
+               OperationState state, bool reserve = false);
+    virtual ~FileBuffer() { on_finish(); }
+    /**
+    * submit the correspoding task to async executor
+    */
+    virtual void submit() = 0;
+    /**
+    * append data to the inner memory buffer
+    *
+    * @param S the content to be appended
+    */
+    virtual Status append_data(const Slice& s) = 0;
+    /**
+    * call the reclaim callback when task is done 
+    */
+    void on_finish();
+    /**
+    * swap memory buffer
+    *
+    * @param other which has memory buffer allocated
+    */
+    void swap_buffer(Slice& other);
+    /**
+    * set the val of it's operation state
+    *
+    * @param S the execution result
+    */
+    void set_val(Status s) { _state.set_val(s); }
+    /**
+    * get the start offset of this file buffer
+    *
+    * @return start offset of this file buffer
+    */
+    size_t get_file_offset() const { return _offset; }
+    /**
+    * get the size of the buffered data
+    *
+    * @return the size of the buffered data
+    */
+    size_t get_size() const { return _size; }
+    /**
+    * detect whether the execution task is done
+    *
+    * @return is the execution task is done
+    */
+    bool is_cancelled() const { return _state.is_cancelled(); }
+
+    std::function<FileBlocksHolderPtr()> _alloc_holder;
+    Slice _buffer;
+    size_t _offset;
+    size_t _size;
+    OperationState _state;
+    size_t _capacity;
+};
+
+struct UploadFileBuffer final : public FileBuffer {
+    UploadFileBuffer(std::function<void(UploadFileBuffer&)> upload_cb, 
OperationState state,
+                     size_t offset, std::function<FileBlocksHolderPtr()> 
alloc_holder,
+                     size_t index_offset)
+            : FileBuffer(alloc_holder, offset, state),
+              _upload_to_remote(std::move(upload_cb)),
+              _index_offset(index_offset) {}
+    ~UploadFileBuffer() override = default;
+    void submit() override;
+    /**
+    * set the index offset
+    *
+    * @param offset the index offset
+    */
+    void set_index_offset(size_t offset);
+    Status append_data(const Slice& s) override;
+    /**
+    * read the content from local file cache
+    * because previously lack of  memory buffer
+    */
+    void read_from_cache();
+    /**
+    * write the content inside memory buffer into 
+    * local file cache
+    */
+    void upload_to_local_file_cache(bool);
+    /**
+    * do the upload work
+    * 1. read from cache if the data is written to cache first
+    * 2. upload content of buffer to S3
+    * 3. upload content to file cache if necessary
+    * 4. call the finish callback caller specified
+    * 5. reclaim self
+    */
+    void on_upload() {
+        if (_buffer.empty()) {
+            read_from_cache();
+        }
+        _upload_to_remote(*this);
+        if (config::enable_flush_file_cache_async) {
+            // If we call is_cancelled() after _state.set_val() then there 
might one situation where
+            // s3 file writer is already destructed
+            bool cancelled = is_cancelled();
+            _state.set_val();
+            // this control flow means the buf and the stream shares one memory
+            // so we can directly use buf here
+            upload_to_local_file_cache(cancelled);
+        } else {
+            upload_to_local_file_cache(is_cancelled());
+            _state.set_val();
+        }
+        on_finish();
+    }
+    /**
+    *
+    * @return the stream representing the inner memory buffer
+    */
+    std::shared_ptr<std::iostream> get_stream() const { return _stream_ptr; }
+
+    /**
+    * Currently only used for small file to set callback
+    */
+    void set_upload_to_remote(std::function<void(UploadFileBuffer&)> cb) {
+        _upload_to_remote = std::move(cb);
+    }
+
+private:
+    std::function<void(UploadFileBuffer&)> _upload_to_remote = nullptr;
+    std::shared_ptr<std::iostream> _stream_ptr; // point to _buffer.get_data()
+
+    bool _is_cache_allocated {false};
+    FileBlocksHolderPtr _holder;
+    decltype(_holder->file_segments.begin()) _cur_file_segment;
+    size_t _append_offset {0};
+    size_t _index_offset {0};
+};
+
+struct FileBufferBuilder {
+    FileBufferBuilder() = default;
+    ~FileBufferBuilder() = default;
+    /**
+    * build one file buffer using previously set properties
+    * @return the file buffer's base shared pointer
+    */
+    std::shared_ptr<FileBuffer> build();
+    /**
+    * set the file buffer type
+    *
+    * @param type enum class for buffer type
+    */
+    FileBufferBuilder& set_type(BufferType type);
+    /**
+    * set the download callback which would download the content on cloud into 
file buffer
+    *
+    * @param cb 
+    */
+    FileBufferBuilder& set_download_callback(std::function<Status(Slice&)> cb) 
{
+        _download = std::move(cb);
+        return *this;
+    }
+    /**
+    * set the upload callback which would upload the content inside buffer 
into remote storage
+    *
+    * @param cb 
+    */
+    FileBufferBuilder& 
set_upload_callback(std::function<void(UploadFileBuffer& buf)> cb);
+    /**
+    * set the callback which would do task sync for the caller
+    *
+    * @param cb 
+    */
+    FileBufferBuilder& 
set_sync_after_complete_task(std::function<bool(Status)> cb);
+    /**
+    * set the callback which detect whether the task is done
+    *
+    * @param cb 
+    */
+    FileBufferBuilder& set_is_cancelled(std::function<bool()> cb) {
+        _is_cancelled = std::move(cb);
+        return *this;
+    }
+    /**
+    * set the callback which allocate file cache segment holder
+    * **Notice**: Because the load file cache workload coule be done
+    * asynchronously so you must make sure all the dependencies of this
+    * cb could last until this cb is invoked
+    * @param cb 
+    */
+    FileBufferBuilder& 
set_allocate_file_segments_holder(std::function<FileBlocksHolderPtr()> cb);
+    /**
+    * set the file offset of the file buffer
+    *
+    * @param cb 
+    */
+    FileBufferBuilder& set_file_offset(size_t offset) {
+        _offset = offset;
+        return *this;
+    }
+    /**
+    * set the index offset of the file buffer
+    *
+    * @param cb 
+    */
+    FileBufferBuilder& set_index_offset(size_t index_offset) {
+        _index_offset = index_offset;
+        return *this;
+    }
+    /**
+    * set the callback which write the content into local file cache
+    *
+    * @param cb 
+    */
+    FileBufferBuilder& set_write_to_local_file_cache(
+            std::function<void(FileBlocksHolderPtr, Slice)> cb) {
+        _write_to_local_file_cache = std::move(cb);
+        return *this;
+    }
+    /**
+    * set the callback which would write the downloaded content into user's 
buffer
+    *
+    * @param cb 
+    */
+    FileBufferBuilder& set_write_to_use_buffer(std::function<void(Slice, 
size_t)> cb) {
+        _write_to_use_buffer = std::move(cb);
+        return *this;
+    }
+
+    BufferType _type;
+    std::function<void(UploadFileBuffer& buf)> _upload_cb = nullptr;
+    std::function<bool(Status)> _sync_after_complete_task = nullptr;
+    std::function<FileBlocksHolderPtr()> _alloc_holder_cb = nullptr;
+    std::function<bool()> _is_cancelled = nullptr;
+    std::function<void(FileBlocksHolderPtr, Slice)> _write_to_local_file_cache;
+    std::function<Status(Slice&)> _download;
+    std::function<void(Slice, size_t)> _write_to_use_buffer;
+    size_t _offset;
+    size_t _index_offset;
+};
+
+class S3FileBufferPool {
+public:
+    S3FileBufferPool() = default;
+    ~S3FileBufferPool() = default;
+
+    // should be called one and only once
+    // at startup
+    void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
+              doris::ThreadPool* thread_pool);
+
+    /**
+    *
+    * @return singleton of the S3FileBufferPool
+    */
+    static S3FileBufferPool* GetInstance() {
+        return ExecEnv::GetInstance()->get_s3_file_buffer_pool();
+    }
+
+    void reclaim(Slice buf) {
+        std::unique_lock<std::mutex> lck {_lock};
+        _free_raw_buffers.emplace_front(buf);
+        _cv.notify_all();
+    }
+
+    /**
+    *
+    * @param reserve must return buffer with memory allocated
+    * @return memory buffer
+    */
+    Slice allocate(bool reserve = false);
+
+    ThreadPool* thread_pool() { return _thread_pool; }
+
+private:
+    std::mutex _lock;
+    std::condition_variable _cv;
+    std::unique_ptr<char[]> _whole_mem_buffer;
+    std::list<Slice> _free_raw_buffers;
+    // not owned
+    ThreadPool* _thread_pool = nullptr;
+};
+} // namespace io
+} // namespace doris
diff --git a/be/src/io/fs/s3_file_write_bufferpool.cpp 
b/be/src/io/fs/s3_file_write_bufferpool.cpp
deleted file mode 100644
index 30b927a2fb9..00000000000
--- a/be/src/io/fs/s3_file_write_bufferpool.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "s3_file_write_bufferpool.h"
-
-#include <cstring>
-
-#include "common/config.h"
-#include "common/logging.h"
-#include "io/fs/s3_common.h"
-#include "runtime/exec_env.h"
-#include "util/defer_op.h"
-#include "util/threadpool.h"
-
-namespace doris {
-namespace io {
-void S3FileBuffer::on_finished() {
-    if (_buf.empty()) {
-        return;
-    }
-    reset();
-    S3FileBufferPool::GetInstance()->reclaim(_buf);
-    _buf.clear();
-}
-
-// when there is memory preserved, directly write data to buf
-// TODO:(AlexYue): write to file cache otherwise, then we'll wait for free 
buffer
-// and to rob it
-void S3FileBuffer::append_data(const Slice& data) {
-    Defer defer {[&] { _size += data.get_size(); }};
-    while (true) {
-        // if buf is not empty, it means there is memory preserved for this buf
-        if (!_buf.empty()) {
-            memcpy(_buf.data + _size, data.get_data(), data.get_size());
-            break;
-        } else {
-            // wait allocate buffer pool
-            auto tmp = S3FileBufferPool::GetInstance()->allocate(true);
-            rob_buffer(tmp);
-        }
-    }
-}
-
-void S3FileBuffer::submit() {
-    if (LIKELY(!_buf.empty())) {
-        _stream_ptr = std::make_shared<StringViewStream>(_buf.data, _size);
-    }
-
-    static_cast<void>(
-            _thread_pool->submit_func([buf = this->shared_from_this()]() { 
buf->_on_upload(); }));
-}
-
-void S3FileBufferPool::init(int32_t s3_write_buffer_whole_size, int32_t 
s3_write_buffer_size,
-                            doris::ThreadPool* thread_pool) {
-    // the nums could be one configuration
-    size_t buf_num = s3_write_buffer_whole_size / s3_write_buffer_size;
-    DCHECK((s3_write_buffer_size >= 5 * 1024 * 1024) &&
-           (s3_write_buffer_whole_size > s3_write_buffer_size));
-    LOG_INFO("S3 file buffer pool with {} buffers", buf_num);
-    _whole_mem_buffer = std::make_unique<char[]>(s3_write_buffer_whole_size);
-    for (size_t i = 0; i < buf_num; i++) {
-        Slice s {_whole_mem_buffer.get() + i * s3_write_buffer_size,
-                 static_cast<size_t>(s3_write_buffer_size)};
-        _free_raw_buffers.emplace_back(s);
-    }
-    _thread_pool = thread_pool;
-}
-
-std::shared_ptr<S3FileBuffer> S3FileBufferPool::allocate(bool reserve) {
-    std::shared_ptr<S3FileBuffer> buf = 
std::make_shared<S3FileBuffer>(_thread_pool);
-    // if need reserve then we must ensure return buf with memory preserved
-    if (reserve) {
-        {
-            std::unique_lock<std::mutex> lck {_lock};
-            _cv.wait(lck, [this]() { return !_free_raw_buffers.empty(); });
-            buf->reserve_buffer(_free_raw_buffers.front());
-            _free_raw_buffers.pop_front();
-        }
-        return buf;
-    }
-    // try to get one memory reserved buffer
-    {
-        std::unique_lock<std::mutex> lck {_lock};
-        if (!_free_raw_buffers.empty()) {
-            buf->reserve_buffer(_free_raw_buffers.front());
-            _free_raw_buffers.pop_front();
-        }
-    }
-    // if there is no free buffer and no need to reserve memory, we could 
return one empty buffer
-    // if the buf has no memory reserved, it would try to write the data to 
file cache first
-    // or it would try to rob buffer from other S3FileBuffer
-    return buf;
-}
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/fs/s3_file_write_bufferpool.h 
b/be/src/io/fs/s3_file_write_bufferpool.h
deleted file mode 100644
index 7e8bf01e19f..00000000000
--- a/be/src/io/fs/s3_file_write_bufferpool.h
+++ /dev/null
@@ -1,150 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <condition_variable>
-#include <cstdint>
-#include <fstream>
-#include <functional>
-#include <list>
-#include <memory>
-#include <mutex>
-
-#include "common/config.h"
-#include "common/status.h"
-#include "io/fs/s3_common.h"
-#include "runtime/exec_env.h"
-#include "util/slice.h"
-
-namespace doris {
-class ThreadPool;
-namespace io {
-
-// TODO(AlexYue): 1. support write into cache 2. unify write buffer and read 
buffer
-struct S3FileBuffer : public std::enable_shared_from_this<S3FileBuffer> {
-    using Callback = std::function<void()>;
-
-    S3FileBuffer(ThreadPool* pool) { _thread_pool = pool; }
-    ~S3FileBuffer() = default;
-
-    void rob_buffer(std::shared_ptr<S3FileBuffer>& other) {
-        _buf = other->_buf;
-        // we should clear other's memory buffer in case it woule be reclaimed 
twice
-        // when calling on_finished
-        other->_buf.clear();
-    }
-
-    void reserve_buffer(Slice s) { _buf = s; }
-
-    // append data into the memory buffer inside
-    // or into the file cache if the buffer has no memory buffer
-    void append_data(const Slice& data);
-    // upload to S3 and file cache in async threadpool
-    void submit();
-    // set the callback to upload to S3 file
-    void set_upload_remote_callback(Callback cb) { _upload_to_remote_callback 
= std::move(cb); }
-    // set callback to do task sync for the caller
-    void set_finish_upload(Callback cb) { _on_finish_upload = std::move(cb); }
-    // set cancel callback to indicate if the whole task is cancelled or not
-    void set_is_cancel(std::function<bool()> cb) { _is_cancelled = 
std::move(cb); }
-    // set callback to notify all the tasks that the whole procedure could be 
cancelled
-    // if this buffer's task failed
-    void set_on_failed(std::function<void(Status)> cb) { _on_failed = 
std::move(cb); }
-    // reclaim this buffer when task is done
-    void on_finished();
-    // set the status of the caller if task failed
-    void set_status(Status s) { _status = std::move(s); }
-    // get the size of the content already appendded
-    size_t get_size() const { return _size; }
-    // get the underlying stream containing
-    const std::shared_ptr<std::iostream>& get_stream() const { return 
_stream_ptr; }
-    // get file offset corresponding to the buffer
-    size_t get_file_offset() const { return _offset; }
-    // set the offset of the buffer
-    void set_file_offset(size_t offset) { _offset = offset; }
-    // reset this buffer to be reused
-    void reset() {
-        _upload_to_remote_callback = nullptr;
-        _is_cancelled = nullptr;
-        _on_failed = nullptr;
-        _on_finish_upload = nullptr;
-        _offset = 0;
-        _size = 0;
-    }
-
-    Callback _upload_to_remote_callback = nullptr;
-    // to control the callback control flow
-    // 1. read from cache if the data is written to cache first
-    // 2. upload content of buffer to S3
-    // 3. upload content to file cache if necessary
-    // 4. call the finish callback caller specified
-    // 5. reclaim self
-    void _on_upload() {
-        _upload_to_remote_callback();
-        _on_finish_upload();
-        on_finished();
-    };
-    // the caller might be cancelled
-    std::function<bool()> _is_cancelled = []() { return false; };
-    // set the caller to be failed
-    std::function<void(Status)> _on_failed = nullptr;
-    // caller of this buf could use this callback to do syncronization
-    Callback _on_finish_upload = nullptr;
-    Status _status;
-    size_t _offset {0};
-    size_t _size {0};
-    std::shared_ptr<std::iostream> _stream_ptr;
-    // only served as one reserved buffer
-    Slice _buf;
-    size_t _append_offset {0};
-    // not owned
-    ThreadPool* _thread_pool = nullptr;
-};
-
-class S3FileBufferPool {
-public:
-    S3FileBufferPool() = default;
-    ~S3FileBufferPool() = default;
-
-    // should be called one and only once
-    // at startup
-    void init(int32_t s3_write_buffer_whole_size, int32_t s3_write_buffer_size,
-              doris::ThreadPool* thread_pool);
-
-    static S3FileBufferPool* GetInstance() {
-        return ExecEnv::GetInstance()->get_s3_file_buffer_pool();
-    }
-
-    void reclaim(Slice buf) {
-        std::unique_lock<std::mutex> lck {_lock};
-        _free_raw_buffers.emplace_front(buf);
-        _cv.notify_all();
-    }
-
-    std::shared_ptr<S3FileBuffer> allocate(bool reserve = false);
-
-private:
-    std::mutex _lock;
-    std::condition_variable _cv;
-    std::unique_ptr<char[]> _whole_mem_buffer;
-    std::list<Slice> _free_raw_buffers;
-    // not owned
-    ThreadPool* _thread_pool = nullptr;
-};
-} // namespace io
-} // namespace doris
diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp
index b844e122a80..0c34a9cc5e7 100644
--- a/be/src/io/fs/s3_file_writer.cpp
+++ b/be/src/io/fs/s3_file_writer.cpp
@@ -45,10 +45,13 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "io/cache/block/block_file_cache.h"
+#include "io/cache/block/block_file_cache_factory.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/path.h"
+#include "io/fs/s3_file_bufferpool.h"
 #include "io/fs/s3_file_system.h"
-#include "io/fs/s3_file_write_bufferpool.h"
+#include "util/debug_points.h"
 #include "util/defer_op.h"
 #include "util/doris_metrics.h"
 #include "util/runtime_profile.h"
@@ -85,11 +88,19 @@ S3FileWriter::S3FileWriter(std::string key, 
std::shared_ptr<S3FileSystem> fs,
         : FileWriter(fmt::format("s3://{}/{}", fs->s3_conf().bucket, key), fs),
           _bucket(fs->s3_conf().bucket),
           _key(std::move(key)),
-          _client(fs->get_client()) {
+          _client(fs->get_client()),
+          _cache(nullptr),
+          _expiration_time(opts ? opts->file_cache_expiration : 0),
+          _is_cold_data(opts ? opts->is_cold_data : true),
+          _write_file_cache(opts ? opts->write_file_cache : false) {
     s3_file_writer_total << 1;
     s3_file_being_written << 1;
 
     Aws::Http::SetCompliantRfc3986Encoding(true);
+    if (config::enable_file_cache && _write_file_cache) {
+        _cache_key = IFileCache::hash(_path.filename().native());
+        _cache = FileCacheFactory::instance()->get_by_path(_cache_key);
+    }
 }
 
 S3FileWriter::~S3FileWriter() {
@@ -100,11 +111,6 @@ S3FileWriter::~S3FileWriter() {
         _bytes_written = 0;
     }
     s3_bytes_written_total << _bytes_written;
-    CHECK(_closed) << ", closed: " << _closed;
-    // in case there are task which might run after this object is destroyed
-    // for example, if the whole task failed and some task are still pending
-    // in threadpool
-    _wait_until_finish("dtor");
     s3_file_being_written << -1;
 }
 
@@ -112,6 +118,12 @@ Status S3FileWriter::_create_multi_upload_request() {
     CreateMultipartUploadRequest create_request;
     create_request.WithBucket(_bucket).WithKey(_key);
     create_request.SetContentType("application/octet-stream");
+    DBUG_EXECUTE_IF("s3_file_writer::_create_multi_upload_request", {
+        return Status::IOError(
+                "failed to create multipart upload(bucket={}, key={}, 
upload_id={}): injected "
+                "error",
+                _bucket, _path.native(), _upload_id);
+    });
 
     auto outcome = _client->CreateMultipartUpload(create_request);
     s3_bvar::s3_multi_part_upload_total << 1;
@@ -150,14 +162,14 @@ Status S3FileWriter::abort() {
     }
     // we need to reclaim the memory
     if (_pending_buf) {
-        _pending_buf->on_finished();
+        _pending_buf->on_finish();
         _pending_buf = nullptr;
     }
+    LOG(INFO) << "S3FileWriter::abort, path: " << _path.native();
     // upload id is empty means there was no create multi upload
     if (_upload_id.empty()) {
         return Status::OK();
     }
-    VLOG_DEBUG << "S3FileWriter::abort, path: " << _path.native();
     _wait_until_finish("Abort");
     AbortMultipartUploadRequest request;
     request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
@@ -190,13 +202,18 @@ Status S3FileWriter::close() {
     // it might be one file less than 5MB, we do upload here
     if (_pending_buf != nullptr) {
         if (_upload_id.empty()) {
-            _pending_buf->set_upload_remote_callback(
-                    [this, buf = _pending_buf]() { _put_object(*buf); });
+            auto buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+            DCHECK(buf != nullptr);
+            buf->set_upload_to_remote([this](UploadFileBuffer& b) { 
_put_object(b); });
         }
         _countdown_event.add_count();
         _pending_buf->submit();
         _pending_buf = nullptr;
     }
+    DBUG_EXECUTE_IF("s3_file_writer::close", {
+        static_cast<void>(_complete());
+        return Status::InternalError("failed to close s3 file writer");
+    });
     RETURN_IF_ERROR(_complete());
 
     return Status::OK();
@@ -205,6 +222,8 @@ Status S3FileWriter::close() {
 Status S3FileWriter::appendv(const Slice* data, size_t data_cnt) {
     DCHECK(!_closed);
     size_t buffer_size = config::s3_write_buffer_size;
+    DBUG_EXECUTE_IF("s3_file_writer::appendv",
+                    { return Status::InternalError("failed to append data"); 
});
     for (size_t i = 0; i < data_cnt; i++) {
         size_t data_size = data[i].get_size();
         for (size_t pos = 0, data_size_to_append = 0; pos < data_size; pos += 
data_size_to_append) {
@@ -212,23 +231,49 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
                 return _st;
             }
             if (!_pending_buf) {
-                _pending_buf = S3FileBufferPool::GetInstance()->allocate();
-                // capture part num by value along with the value of the 
shared ptr
-                _pending_buf->set_upload_remote_callback(
-                        [part_num = _cur_part_num, this, cur_buf = 
_pending_buf]() {
-                            _upload_one_part(part_num, *cur_buf);
-                        });
-                _pending_buf->set_file_offset(_bytes_appended);
-                // later we might need to wait all prior tasks to be finished
-                _pending_buf->set_finish_upload([this]() { 
_countdown_event.signal(); });
-                _pending_buf->set_is_cancel([this]() { return _failed.load(); 
});
-                _pending_buf->set_on_failed([this, part_num = 
_cur_part_num](Status st) {
-                    VLOG_NOTICE << "failed at key: " << _key << ", load part " 
<< part_num
-                                << ", st " << st.to_string();
-                    std::unique_lock<std::mutex> _lck {_completed_lock};
-                    this->_st = std::move(st);
-                    _failed = true;
-                });
+                auto builder = FileBufferBuilder();
+                builder.set_type(BufferType::UPLOAD)
+                        .set_upload_callback(
+                                [part_num = _cur_part_num, 
this](UploadFileBuffer& buf) {
+                                    _upload_one_part(part_num, buf);
+                                })
+                        .set_file_offset(_bytes_appended)
+                        .set_index_offset(_index_offset)
+                        .set_sync_after_complete_task([this, part_num = 
_cur_part_num](Status s) {
+                            bool ret = false;
+                            if (!s.ok()) [[unlikely]] {
+                                VLOG_NOTICE << "failed at key: " << _key << ", 
load part "
+                                            << part_num << ", st " << s;
+                                std::unique_lock<std::mutex> _lck 
{_completed_lock};
+                                _failed = true;
+                                ret = true;
+                                this->_st = std::move(s);
+                            }
+                            // After the signal, there is a scenario where the 
previous invocation of _wait_until_finish
+                            // returns to the caller, and subsequently, the S3 
file writer is destructed.
+                            // This means that accessing _failed afterwards 
would result in a heap use after free vulnerability.
+                            _countdown_event.signal();
+                            return ret;
+                        })
+                        .set_is_cancelled([this]() { return _failed.load(); });
+                if (_write_file_cache) {
+                    // We would load the data into file cache asynchronously 
which indicates
+                    // that this instance of S3FileWriter might have been 
destructed when we
+                    // try to do writing into file cache, so we make the 
lambda capture the variable
+                    // we need by value to extend their lifetime
+                    builder.set_allocate_file_segments_holder(
+                            [cache = _cache, k = _cache_key, offset = 
_bytes_appended,
+                             t = _expiration_time, cold = _is_cold_data]() -> 
FileBlocksHolderPtr {
+                                CacheContext ctx;
+                                ctx.cache_type = t == 0 ? CacheType::NORMAL : 
CacheType::TTL;
+                                ctx.expiration_time = t;
+                                ctx.is_cold_data = cold;
+                                auto holder = cache->get_or_set(k, offset,
+                                                                
config::s3_write_buffer_size, ctx);
+                                return 
std::make_unique<FileBlocksHolder>(std::move(holder));
+                            });
+                }
+                _pending_buf = builder.build();
             }
             // we need to make sure all parts except the last one to be 5MB or 
more
             // and shouldn't be larger than buf
@@ -237,7 +282,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
 
             // if the buffer has memory buf inside, the data would be written 
into memory first then S3 then file cache
             // it would be written to cache then S3 if the buffer doesn't have 
memory preserved
-            _pending_buf->append_data(Slice {data[i].get_data() + pos, 
data_size_to_append});
+            RETURN_IF_ERROR(_pending_buf->append_data(
+                    Slice {data[i].get_data() + pos, data_size_to_append}));
 
             // if it's the last part, it could be less than 5MB, or it must
             // satisfy that the size is larger than or euqal to 5MB
@@ -259,8 +305,8 @@ Status S3FileWriter::appendv(const Slice* data, size_t 
data_cnt) {
     return Status::OK();
 }
 
-void S3FileWriter::_upload_one_part(int64_t part_num, S3FileBuffer& buf) {
-    if (buf._is_cancelled()) {
+void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {
+    if (buf.is_cancelled()) {
         return;
     }
     UploadPartRequest upload_request;
@@ -279,13 +325,25 @@ void S3FileWriter::_upload_one_part(int64_t part_num, 
S3FileBuffer& buf) {
     s3_bvar::s3_multi_part_upload_total << 1;
 
     UploadPartOutcome upload_part_outcome = upload_part_callable.get();
+    DBUG_EXECUTE_IF("s3_file_writer::_upload_one_part", {
+        if (part_num > 1) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(500));
+            auto s = Status::IOError(
+                    "failed to upload part (bucket={}, key={}, part_num={}, 
up_load_id={}): "
+                    "injected error",
+                    _bucket, _path.native(), part_num, _upload_id);
+            LOG_WARNING(s.to_string());
+            buf.set_val(s);
+            return;
+        }
+    });
     if (!upload_part_outcome.IsSuccess()) {
         auto s = Status::IOError(
                 "failed to upload part (bucket={}, key={}, part_num={}, 
up_load_id={}): {}",
                 _bucket, _path.native(), part_num, _upload_id,
                 upload_part_outcome.GetError().GetMessage());
         LOG_WARNING(s.to_string());
-        buf._on_failed(s);
+        buf.set_val(s);
         return;
     }
     s3_bytes_written_total << buf.get_size();
@@ -315,16 +373,41 @@ Status S3FileWriter::_complete() {
     
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);
 
     _wait_until_finish("Complete");
+    DBUG_EXECUTE_IF("s3_file_writer::_complete:1", { _cur_part_num++; });
+    if (_failed || _completed_parts.size() != _cur_part_num) {
+        auto st = Status::IOError("error status {}, complete parts {}, cur 
part num {}", _st,
+                                  _completed_parts.size(), _cur_part_num);
+        LOG(WARNING) << st;
+        _st = st;
+        return st;
+    }
     // make sure _completed_parts are ascending order
     std::sort(_completed_parts.begin(), _completed_parts.end(),
               [](auto& p1, auto& p2) { return p1->GetPartNumber() < 
p2->GetPartNumber(); });
+    DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
+                    { _completed_parts.back()->SetPartNumber(10 * 
_completed_parts.size()); });
     CompletedMultipartUpload completed_upload;
-    for (auto& part : _completed_parts) {
-        completed_upload.AddParts(*part);
+    for (size_t i = 0; i < _completed_parts.size(); i++) {
+        if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
+            auto st = Status::IOError(
+                    "error status {}, part num not continous, expected num {}, 
actual num {}", _st,
+                    i + 1, _completed_parts[i]->GetPartNumber());
+            LOG(WARNING) << st;
+            _st = st;
+            return st;
+        }
+        completed_upload.AddParts(*_completed_parts[i]);
     }
 
     complete_request.WithMultipartUpload(completed_upload);
 
+    DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
+        auto s = Status::IOError(
+                "failed to create complete multi part upload (bucket={}, 
key={}): injected error",
+                _bucket, _path.native());
+        LOG_WARNING(s.to_string());
+        return s;
+    });
     auto compute_outcome = _client->CompleteMultipartUpload(complete_request);
     s3_bvar::s3_multi_part_upload_total << 1;
 
@@ -341,14 +424,17 @@ Status S3FileWriter::_complete() {
 
 Status S3FileWriter::finalize() {
     DCHECK(!_closed);
+    DBUG_EXECUTE_IF("s3_file_writer::finalize",
+                    { return Status::IOError("failed to finalize due to 
injected error"); });
     // submit pending buf if it's not nullptr
     // it's the last buf, we can submit it right now
     if (_pending_buf != nullptr) {
         // if we only need to upload one file less than 5MB, we can just
         // call PutObject to reduce the network IO
         if (_upload_id.empty()) {
-            _pending_buf->set_upload_remote_callback(
-                    [this, buf = _pending_buf]() { _put_object(*buf); });
+            auto buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
+            DCHECK(buf != nullptr);
+            buf->set_upload_to_remote([this](UploadFileBuffer& b) { 
_put_object(b); });
         }
         _countdown_event.add_count();
         _pending_buf->submit();
@@ -358,7 +444,7 @@ Status S3FileWriter::finalize() {
     return _st;
 }
 
-void S3FileWriter::_put_object(S3FileBuffer& buf) {
+void S3FileWriter::_put_object(UploadFileBuffer& buf) {
     DCHECK(!_closed) << "closed " << _closed;
     Aws::S3::Model::PutObjectRequest request;
     request.WithBucket(_bucket).WithKey(_key);
@@ -367,6 +453,12 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
     request.SetBody(buf.get_stream());
     request.SetContentLength(buf.get_size());
     request.SetContentType("application/octet-stream");
+    DBUG_EXECUTE_IF("s3_file_writer::_put_object", {
+        _st = Status::InternalError("failed to put object");
+        buf.set_val(_st);
+        LOG(WARNING) << _st;
+        return;
+    });
     auto response = _client->PutObject(request);
     s3_bvar::s3_put_total << 1;
     if (!response.IsSuccess()) {
@@ -374,7 +466,7 @@ void S3FileWriter::_put_object(S3FileBuffer& buf) {
                                     response.GetError().GetExceptionName(),
                                     response.GetError().GetMessage(),
                                     
static_cast<int>(response.GetError().GetResponseCode()));
-        buf._on_failed(_st);
+        buf.set_val(_st);
         LOG(WARNING) << _st;
         return;
     }
diff --git a/be/src/io/fs/s3_file_writer.h b/be/src/io/fs/s3_file_writer.h
index 4fb64b2d00f..b8d53cf6482 100644
--- a/be/src/io/fs/s3_file_writer.h
+++ b/be/src/io/fs/s3_file_writer.h
@@ -27,6 +27,7 @@
 #include "io/fs/file_system.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/path.h"
+#include "io/fs/s3_file_bufferpool.h"
 
 namespace Aws::S3 {
 namespace Model {
@@ -58,8 +59,8 @@ private:
     void _wait_until_finish(std::string_view task_name);
     Status _complete();
     Status _create_multi_upload_request();
-    void _put_object(S3FileBuffer& buf);
-    void _upload_one_part(int64_t part_num, S3FileBuffer& buf);
+    void _put_object(UploadFileBuffer& buf);
+    void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);
 
     std::string _bucket;
     std::string _key;
@@ -68,20 +69,26 @@ private:
 
     std::shared_ptr<Aws::S3::S3Client> _client;
     std::string _upload_id;
+    size_t _index_offset {0};
 
     // Current Part Num for CompletedPart
     int _cur_part_num = 1;
     std::mutex _completed_lock;
     std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>> 
_completed_parts;
 
+    IFileCache::Key _cache_key;
+    IFileCache* _cache;
     // **Attention** call add_count() before submitting buf to async thread 
pool
     bthread::CountdownEvent _countdown_event {0};
 
     std::atomic_bool _failed = false;
-    Status _st = Status::OK();
+    Status _st;
     size_t _bytes_written = 0;
 
-    std::shared_ptr<S3FileBuffer> _pending_buf = nullptr;
+    std::shared_ptr<FileBuffer> _pending_buf;
+    int64_t _expiration_time;
+    bool _is_cold_data;
+    bool _write_file_cache;
 };
 
 } // namespace io
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index bef81e6256b..2d7766e714a 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -53,6 +53,8 @@ struct IOContext {
     bool is_disposable = false;
     bool is_index_data = false;
     bool read_file_cache = true;
+    // TODO(lightman): use following member variables to control file cache
+    bool is_persistent = false;
     int64_t expiration_time = 0;
     const TUniqueId* query_id = nullptr;             // Ref
     FileCacheStatistics* file_cache_stats = nullptr; // Ref
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 9fffd859ba3..0f7a2e64b17 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -162,6 +162,7 @@ public:
     ThreadPool* buffered_reader_prefetch_thread_pool() {
         return _buffered_reader_prefetch_thread_pool.get();
     }
+    ThreadPool* s3_file_upload_thread_pool() { return 
_s3_file_upload_thread_pool.get(); }
     ThreadPool* send_report_thread_pool() { return 
_send_report_thread_pool.get(); }
     ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); 
}
 
@@ -309,6 +310,8 @@ private:
     std::unique_ptr<ThreadPool> _download_cache_thread_pool;
     // Threadpool used to prefetch remote file for buffered reader
     std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
+    // Threadpool used to upload local file to s3
+    std::unique_ptr<ThreadPool> _s3_file_upload_thread_pool;
     // A token used to submit download cache task serially
     std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
     // Pool used by fragment manager to send profile or status to FE 
coordinator
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 646c8fca2be..36502585426 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -39,7 +39,7 @@
 #include "common/status.h"
 #include "io/cache/block/block_file_cache_factory.h"
 #include "io/fs/file_meta_cache.h"
-#include "io/fs/s3_file_write_bufferpool.h"
+#include "io/fs/s3_file_bufferpool.h"
 #include "olap/memtable_memory_limiter.h"
 #include "olap/olap_define.h"
 #include "olap/options.h"
@@ -173,6 +173,11 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
                               .set_max_threads(64)
                               .build(&_buffered_reader_prefetch_thread_pool));
 
+    static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
+                              .set_min_threads(16)
+                              .set_max_threads(64)
+                              .build(&_s3_file_upload_thread_pool));
+
     // min num equal to fragment pool's min num
     // max num is useless because it will start as many as requested in the 
past
     // queue size is useless because the max thread num is very large
@@ -244,7 +249,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths,
     // S3 buffer pool
     _s3_buffer_pool = new io::S3FileBufferPool();
     _s3_buffer_pool->init(config::s3_write_buffer_whole_size, 
config::s3_write_buffer_size,
-                          this->buffered_reader_prefetch_thread_pool());
+                          this->s3_file_upload_thread_pool());
 
     // Storage engine
     doris::EngineOptions options;
@@ -548,6 +553,7 @@ void ExecEnv::destroy() {
     _stream_load_executor.reset();
     SAFE_STOP(_storage_engine);
     SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
+    SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
     SAFE_SHUTDOWN(_join_node_thread_pool);
     SAFE_SHUTDOWN(_send_report_thread_pool);
     SAFE_SHUTDOWN(_send_batch_thread_pool);
@@ -613,6 +619,7 @@ void ExecEnv::destroy() {
     _join_node_thread_pool.reset(nullptr);
     _send_report_thread_pool.reset(nullptr);
     _buffered_reader_prefetch_thread_pool.reset(nullptr);
+    _s3_file_upload_thread_pool.reset(nullptr);
     _send_batch_thread_pool.reset(nullptr);
 
     SAFE_DELETE(_broker_client_cache);
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index 61759147e8c..33fe22b12e7 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -60,7 +60,6 @@
 #include "common/signal_handler.h"
 #include "common/status.h"
 #include "io/cache/block/block_file_cache_factory.h"
-#include "io/fs/s3_file_write_bufferpool.h"
 #include "olap/options.h"
 #include "olap/storage_engine.h"
 #include "runtime/exec_env.h"
diff --git a/be/test/io/fs/remote_file_system_test.cpp 
b/be/test/io/fs/remote_file_system_test.cpp
index a24d4e3932b..c5d80d1b65d 100644
--- a/be/test/io/fs/remote_file_system_test.cpp
+++ b/be/test/io/fs/remote_file_system_test.cpp
@@ -410,11 +410,11 @@ TEST_F(RemoteFileSystemTest, TestHdfsFileSystem) {
 
 TEST_F(RemoteFileSystemTest, TestS3FileSystem) {
     std::unique_ptr<ThreadPool> _pool;
-    ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
+    ThreadPoolBuilder("S3FileUploadThreadPool")
             .set_min_threads(5)
             .set_max_threads(10)
             .build(&_pool);
-    ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool = 
std::move(_pool);
+    ExecEnv::GetInstance()->_s3_file_upload_thread_pool = std::move(_pool);
     S3Conf s3_conf;
     S3URI s3_uri(s3_location);
     CHECK_STATUS_OK(s3_uri.parse());
diff --git a/be/test/io/fs/s3_file_writer_test.cpp 
b/be/test/io/fs/s3_file_writer_test.cpp
new file mode 100644
index 00000000000..a1df6a1ea8b
--- /dev/null
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -0,0 +1,479 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/CompletedPart.h>
+#include <aws/s3/model/UploadPartRequest.h>
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <chrono>
+#include <cstdlib>
+#include <memory>
+#include <thread>
+
+#include "common/config.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_writer.h"
+#include "io/fs/local_file_system.h"
+#include "io/fs/s3_file_bufferpool.h"
+#include "io/fs/s3_file_system.h"
+#include "io/io_common.h"
+#include "runtime/exec_env.h"
+#include "testutil/http_utils.h"
+#include "util/debug_points.h"
+#include "util/slice.h"
+#include "util/threadpool.h"
+namespace doris {
+
+static std::shared_ptr<io::S3FileSystem> s3_fs {nullptr};
+
+class S3FileWriterTest : public testing::Test {
+public:
+    static void SetUpTestSuite() {
+        S3Conf s3_conf;
+        config::enable_debug_points = true;
+        DebugPoints::instance()->clear();
+        s3_conf.ak = config::test_s3_ak;
+        s3_conf.sk = config::test_s3_sk;
+        s3_conf.endpoint = config::test_s3_endpoint;
+        s3_conf.region = config::test_s3_region;
+        s3_conf.bucket = config::test_s3_bucket;
+        s3_conf.prefix = "s3_file_writer_test";
+        static_cast<void>(
+                io::S3FileSystem::create(std::move(s3_conf), 
"s3_file_writer_test", &s3_fs));
+        std::cout << "s3 conf: " << s3_conf.to_string() << std::endl;
+        ASSERT_EQ(Status::OK(), s3_fs->connect());
+
+        std::unique_ptr<doris::ThreadPool> _s3_file_upload_thread_pool;
+        static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
+                                  .set_min_threads(16)
+                                  .set_max_threads(64)
+                                  .build(&_s3_file_upload_thread_pool));
+        ExecEnv::GetInstance()->_s3_file_upload_thread_pool =
+                std::move(_s3_file_upload_thread_pool);
+        ExecEnv::GetInstance()->_s3_buffer_pool = new io::S3FileBufferPool();
+        io::S3FileBufferPool::GetInstance()->init(
+                config::s3_write_buffer_whole_size, 
config::s3_write_buffer_size,
+                ExecEnv::GetInstance()->_s3_file_upload_thread_pool.get());
+    }
+
+    static void TearDownTestSuite() {
+        ExecEnv::GetInstance()->_s3_file_upload_thread_pool->shutdown();
+        ExecEnv::GetInstance()->_s3_file_upload_thread_pool = nullptr;
+        delete ExecEnv::GetInstance()->_s3_buffer_pool;
+        ExecEnv::GetInstance()->_s3_buffer_pool = nullptr;
+    }
+
+private:
+};
+
+TEST_F(S3FileWriterTest, multi_part_io_error) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_upload_one_part");
+        Defer defer {[&]() {
+            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_upload_one_part");
+        }};
+        auto client = s3_fs->get_client();
+        io::FileReaderSPtr local_file_reader;
+
+        ASSERT_TRUE(
+                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader)
+                        .ok());
+
+        constexpr int buf_size = 8192;
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("multi_part_io_error", 
&s3_file_writer, &state));
+
+        char buf[buf_size];
+        doris::Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        auto file_size = local_file_reader->size();
+        while (offset < file_size) {
+            ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+            ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf, 
bytes_read)));
+            offset += bytes_read;
+        }
+        ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+        ASSERT_TRUE(!s3_file_writer->finalize().ok());
+        // The second part would fail uploading itself to s3
+        // so the result of close should be not ok
+        ASSERT_TRUE(!s3_file_writer->close().ok());
+        bool exits = false;
+        auto s = s3_fs->exists("multi_part_io_error", &exits);
+        LOG(INFO) << "status is " << s;
+        ASSERT_TRUE(!exits);
+    }
+}
+
+TEST_F(S3FileWriterTest, put_object_io_error) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_put_object");
+        Defer defer {[&]() {
+            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_put_object");
+        }};
+        auto client = s3_fs->get_client();
+        io::FileReaderSPtr local_file_reader;
+
+        ASSERT_TRUE(
+                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader)
+                        .ok());
+
+        constexpr int buf_size = 8192;
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("put_object_io_error", 
&s3_file_writer, &state));
+
+        char buf[buf_size];
+        Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        // Only upload 4MB to trigger put object operation
+        auto file_size = 4 * 1024 * 1024;
+        while (offset < file_size) {
+            ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+            ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf, 
bytes_read)));
+            offset += bytes_read;
+        }
+        ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+        ASSERT_TRUE(!s3_file_writer->finalize().ok());
+        // The object might be timeout but still succeed in loading
+        ASSERT_TRUE(!s3_file_writer->close().ok());
+    }
+}
+
+TEST_F(S3FileWriterTest, appendv_random_quit) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        io::FileReaderSPtr local_file_reader;
+
+        ASSERT_TRUE(
+                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader)
+                        .ok());
+
+        constexpr int buf_size = 8192;
+        
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::appendv");
+        Defer defer {[&]() {
+            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::appendv");
+        }};
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("appendv_random_quit", 
&s3_file_writer, &state));
+
+        char buf[buf_size];
+        Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+        ASSERT_TRUE(!s3_file_writer->append(Slice(buf, bytes_read)).ok());
+        bool exits = false;
+        static_cast<void>(s3_fs->exists("appendv_random_quit", &exits));
+        ASSERT_TRUE(!exits);
+    }
+}
+
+TEST_F(S3FileWriterTest, multi_part_open_error) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        io::FileReaderSPtr local_file_reader;
+
+        ASSERT_TRUE(
+                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader)
+                        .ok());
+
+        constexpr int buf_size = 5 * 1024 * 1024;
+        POST_HTTP_TO_TEST_SERVER(
+                
"/api/debug_point/add/s3_file_writer::_create_multi_upload_request");
+        Defer defer {[&]() {
+            POST_HTTP_TO_TEST_SERVER(
+                    
"/api/debug_point/remove/s3_file_writer::_create_multi_upload_request");
+        }};
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(),
+                  s3_fs->create_file("multi_part_open_error", &s3_file_writer, 
&state));
+
+        auto buf = std::make_unique<char[]>(buf_size);
+        Slice slice(buf.get(), buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+        // Directly write 5MB would cause one create multi part upload request
+        // and it would be rejectd one error
+        auto st = s3_file_writer->append(Slice(buf.get(), bytes_read));
+        ASSERT_TRUE(!st.ok());
+        bool exits = false;
+        static_cast<void>(s3_fs->exists("multi_part_open_error", &exits));
+        ASSERT_TRUE(!exits);
+    }
+}
+
+TEST_F(S3FileWriterTest, normal) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        io::FileReaderSPtr local_file_reader;
+
+        ASSERT_TRUE(
+                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader)
+                        .ok());
+
+        constexpr int buf_size = 8192;
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("normal", &s3_file_writer, 
&state));
+
+        char buf[buf_size];
+        Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        auto file_size = local_file_reader->size();
+        while (offset < file_size) {
+            ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+            ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf, 
bytes_read)));
+            offset += bytes_read;
+        }
+        ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+        ASSERT_TRUE(s3_file_writer->finalize().ok());
+        ASSERT_EQ(Status::OK(), s3_file_writer->close());
+        int64_t s3_file_size = 0;
+        ASSERT_EQ(Status::OK(), s3_fs->file_size("normal", &s3_file_size));
+        ASSERT_EQ(s3_file_size, file_size);
+    }
+}
+
+TEST_F(S3FileWriterTest, smallFile) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        io::FileReaderSPtr local_file_reader;
+
+        
ASSERT_TRUE(fs->open_file("./be/test/olap/test_data/all_types_1000.txt", 
&local_file_reader)
+                            .ok());
+
+        constexpr int buf_size = 8192;
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("small", &s3_file_writer, 
&state));
+
+        char buf[buf_size];
+        Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        auto file_size = local_file_reader->size();
+        while (offset < file_size) {
+            ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+            ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf, 
bytes_read)));
+            offset += bytes_read;
+        }
+        ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+        ASSERT_TRUE(s3_file_writer->finalize().ok());
+        ASSERT_EQ(Status::OK(), s3_file_writer->close());
+        int64_t s3_file_size = 0;
+        ASSERT_EQ(Status::OK(), s3_fs->file_size("small", &s3_file_size));
+        ASSERT_EQ(s3_file_size, file_size);
+    }
+}
+
+TEST_F(S3FileWriterTest, close_error) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        io::FileReaderSPtr local_file_reader;
+
+        
ASSERT_TRUE(fs->open_file("./be/test/olap/test_data/all_types_1000.txt", 
&local_file_reader)
+                            .ok());
+
+        POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::close");
+        Defer defer {[&]() {
+            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::close");
+        }};
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_TRUE(s3_fs->create_file("close_error", &s3_file_writer, 
&state).ok());
+        ASSERT_TRUE(!s3_file_writer->close().ok());
+        bool exits = false;
+        static_cast<void>(s3_fs->exists("close_error", &exits));
+        ASSERT_TRUE(!exits);
+    }
+}
+
+TEST_F(S3FileWriterTest, finalize_error) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        io::FileReaderSPtr local_file_reader;
+
+        
ASSERT_TRUE(fs->open_file("./be/test/olap/test_data/all_types_1000.txt", 
&local_file_reader)
+                            .ok());
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("finalize_error", 
&s3_file_writer, &state));
+
+        
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::finalize");
+        Defer defer {[&]() {
+            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::finalize");
+        }};
+
+        constexpr int buf_size = 8192;
+
+        char buf[buf_size];
+        Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        auto file_size = local_file_reader->size();
+        while (offset < file_size) {
+            ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+            ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf, 
bytes_read)));
+            offset += bytes_read;
+        }
+        ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+        ASSERT_TRUE(!s3_file_writer->finalize().ok());
+        bool exits = false;
+        static_cast<void>(s3_fs->exists("finalize_error", &exits));
+        ASSERT_TRUE(!exits);
+    }
+}
+
+TEST_F(S3FileWriterTest, multi_part_complete_error_2) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_complete:2");
+        Defer defer {[&]() {
+            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_complete:2");
+        }};
+        auto client = s3_fs->get_client();
+        io::FileReaderSPtr local_file_reader;
+
+        ASSERT_TRUE(
+                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader)
+                        .ok());
+
+        constexpr int buf_size = 8192;
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("multi_part_io_error", 
&s3_file_writer, &state));
+
+        char buf[buf_size];
+        Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        auto file_size = local_file_reader->size();
+        while (offset < file_size) {
+            ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+            ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf, 
bytes_read)));
+            offset += bytes_read;
+        }
+        ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+        ASSERT_TRUE(s3_file_writer->finalize().ok());
+        // The second part would fail uploading itself to s3
+        // so the result of close should be not ok
+        auto st = s3_file_writer->close();
+        ASSERT_TRUE(!st.ok());
+        std::cout << st << std::endl;
+    }
+}
+
+TEST_F(S3FileWriterTest, multi_part_complete_error_1) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_complete:1");
+        Defer defer {[&]() {
+            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_complete:1");
+        }};
+        auto client = s3_fs->get_client();
+        io::FileReaderSPtr local_file_reader;
+
+        ASSERT_TRUE(
+                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader)
+                        .ok());
+
+        constexpr int buf_size = 8192;
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("multi_part_io_error", 
&s3_file_writer, &state));
+
+        char buf[buf_size];
+        Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        auto file_size = local_file_reader->size();
+        while (offset < file_size) {
+            ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+            ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf, 
bytes_read)));
+            offset += bytes_read;
+        }
+        ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+        ASSERT_TRUE(s3_file_writer->finalize().ok());
+        // The second part would fail uploading itself to s3
+        // so the result of close should be not ok
+        auto st = s3_file_writer->close();
+        ASSERT_TRUE(!st.ok());
+        std::cout << st << std::endl;
+    }
+}
+
+TEST_F(S3FileWriterTest, multi_part_complete_error_3) {
+    doris::io::FileWriterOptions state;
+    auto fs = io::global_local_filesystem();
+    {
+        
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/s3_file_writer::_complete:3");
+        Defer defer {[&]() {
+            
POST_HTTP_TO_TEST_SERVER("/api/debug_point/remove/s3_file_writer::_complete:3");
+        }};
+        auto client = s3_fs->get_client();
+        io::FileReaderSPtr local_file_reader;
+
+        ASSERT_TRUE(
+                fs->open_file("./be/test/olap/test_data/all_types_100000.txt", 
&local_file_reader)
+                        .ok());
+
+        constexpr int buf_size = 8192;
+
+        io::FileWriterPtr s3_file_writer;
+        ASSERT_EQ(Status::OK(), s3_fs->create_file("multi_part_io_error", 
&s3_file_writer, &state));
+
+        char buf[buf_size];
+        Slice slice(buf, buf_size);
+        size_t offset = 0;
+        size_t bytes_read = 0;
+        auto file_size = local_file_reader->size();
+        while (offset < file_size) {
+            ASSERT_TRUE(local_file_reader->read_at(offset, slice, 
&bytes_read).ok());
+            ASSERT_EQ(Status::OK(), s3_file_writer->append(Slice(buf, 
bytes_read)));
+            offset += bytes_read;
+        }
+        ASSERT_EQ(s3_file_writer->bytes_appended(), file_size);
+        ASSERT_TRUE(s3_file_writer->finalize().ok());
+        // The second part would fail uploading itself to s3
+        // so the result of close should be not ok
+        auto st = s3_file_writer->close();
+        ASSERT_TRUE(!st.ok());
+        std::cout << st << std::endl;
+    }
+}
+
+} // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to