This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch doris-for-zhongjin
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 6a1209c4e36d6929baf9a8ce8dd3473d007339b0
Author: Ashin Gau <[email protected]>
AuthorDate: Tue Apr 4 19:05:22 2023 +0800

    [opt](file_reader) add prefetch buffer to read csv&json file (#18301)
    
    Co-authored-by: ByteYue 
<[[email protected]](mailto:[email protected])>
    This PR is an optimization for https://github.com/apache/doris/pull/17478:
    1. Change the buffer size of `LineReader` to 4MB to align with the size of 
prefetch buffer.
    2. Lazily prefetch data in the first read to prevent wasted reading.
    3. S3 block size is 32MB only, which is too small for a file split. Set 
128MB as default file split size.
    4. Add `_end_offset` for prefetch buffer to prevent wasted reading.
    
    The query performance of reading data on object storage is improved by more 
than 3x+.
---
 be/src/io/fs/buffered_reader.cpp                   | 155 +++++++++++++++++++++
 be/src/io/fs/buffered_reader.h                     | 110 +++++++++++++++
 be/src/runtime/exec_env.h                          |   5 +
 be/src/runtime/exec_env_init.cpp                   |   5 +
 be/src/vec/exec/format/csv/csv_reader.cpp          |  16 ++-
 .../file_reader/new_plain_text_line_reader.cpp     |   3 +-
 be/src/vec/exec/format/json/new_json_reader.cpp    |  16 ++-
 .../main/java/org/apache/doris/common/Config.java  |   5 +-
 8 files changed, 309 insertions(+), 6 deletions(-)

diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 552b9b1a83..4f47ff5671 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -28,6 +28,161 @@
 namespace doris {
 namespace io {
 
+// there exists occasions where the buffer is already closed but
+// some prior tasks are still queued in thread pool, so we have to check 
whether
+// the buffer is closed each time the condition variable is notified.
+void PrefetchBuffer::reset_offset(size_t offset) {
+    if (UNLIKELY(offset >= _end_offset)) {
+        return;
+    }
+    {
+        std::unique_lock lck {_lock};
+        _prefetched.wait(lck, [this]() { return _buffer_status != 
BufferStatus::PENDING; });
+        if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) {
+            _prefetched.notify_all();
+            return;
+        }
+        _buffer_status = BufferStatus::RESET;
+        _offset = offset;
+        _prefetched.notify_all();
+    }
+    
ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
+            [buffer_ptr = shared_from_this()]() { 
buffer_ptr->prefetch_buffer(); });
+}
+
+// only this function would run concurrently in another thread
+void PrefetchBuffer::prefetch_buffer() {
+    {
+        std::unique_lock lck {_lock};
+        _prefetched.wait(lck, [this]() {
+            return _buffer_status == BufferStatus::RESET || _buffer_status == 
BufferStatus::CLOSED;
+        });
+        // in case buffer is already closed
+        if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) {
+            _prefetched.notify_all();
+            return;
+        }
+        _buffer_status = BufferStatus::PENDING;
+        _prefetched.notify_all();
+    }
+    _len = 0;
+    Status s;
+
+    size_t buf_size = _end_offset - _offset > _size ? _size : _end_offset - 
_offset;
+    s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len);
+    std::unique_lock lck {_lock};
+    _prefetched.wait(lck, [this]() { return _buffer_status == 
BufferStatus::PENDING; });
+    if (!s.ok() && _offset < _reader->size()) {
+        _prefetch_status = std::move(s);
+    }
+    _buffer_status = BufferStatus::PREFETCHED;
+    _prefetched.notify_all();
+    // eof would come up with len == 0, it would be handled by read_buffer
+}
+
+Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
+                                   size_t* bytes_read) {
+    if (UNLIKELY(off >= _end_offset)) {
+        // Reader can read out of [_start_offset, _end_offset) by synchronous 
method.
+        return _reader->read_at(off, Slice {out, buf_len}, bytes_read);
+    }
+    {
+        std::unique_lock lck {_lock};
+        // buffer must be prefetched or it's closed
+        _prefetched.wait(lck, [this]() {
+            return _buffer_status == BufferStatus::PREFETCHED ||
+                   _buffer_status == BufferStatus::CLOSED;
+        });
+        if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) {
+            return Status::OK();
+        }
+    }
+    RETURN_IF_ERROR(_prefetch_status);
+    // there is only parquet would do not sequence read
+    // it would read the end of the file first
+    if (UNLIKELY(!contains(off))) {
+        reset_offset((off / _size) * _size);
+        return read_buffer(off, out, buf_len, bytes_read);
+    }
+    if (UNLIKELY(0 == _len || _offset + _len < off)) {
+        return Status::OK();
+    }
+    // [0]: maximum len trying to read, [1] maximum length buffer can provide, 
[2] actual len buffer has
+    size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len 
- off});
+    memcpy((void*)out, _buf.data() + (off - _offset), read_len);
+    *bytes_read = read_len;
+    if (off + *bytes_read == _offset + _len) {
+        reset_offset(_offset + _whole_buffer_size);
+    }
+    return Status::OK();
+}
+
+void PrefetchBuffer::close() {
+    std::unique_lock lck {_lock};
+    // in case _reader still tries to write to the buf after we close the 
buffer
+    _prefetched.wait(lck, [this]() { return _buffer_status != 
BufferStatus::PENDING; });
+    _buffer_status = BufferStatus::CLOSED;
+    _prefetched.notify_all();
+}
+
+// buffered reader
+PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, 
int64_t offset,
+                                               int64_t length, int64_t 
buffer_size)
+        : _reader(std::move(reader)), _start_offset(offset), 
_end_offset(offset + length) {
+    if (buffer_size == -1L) {
+        buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
+    }
+    _size = _reader->size();
+    _whole_pre_buffer_size = buffer_size;
+    int buffer_num = buffer_size > s_max_pre_buffer_size ? buffer_size / 
s_max_pre_buffer_size : 1;
+    // set the _cur_offset of this reader as same as the inner reader's,
+    // to make sure the buffer reader will start to read at right position.
+    for (int i = 0; i < buffer_num; i++) {
+        _pre_buffers.emplace_back(
+                std::make_shared<PrefetchBuffer>(_start_offset, _end_offset, 
s_max_pre_buffer_size,
+                                                 _whole_pre_buffer_size, 
_reader.get()));
+    }
+}
+
+PrefetchBufferedReader::~PrefetchBufferedReader() {
+    close();
+    _closed = true;
+}
+
+Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, 
size_t* bytes_read,
+                                            const IOContext* io_ctx) {
+    if (!_initialized) {
+        reset_all_buffer(offset);
+        _initialized = true;
+    }
+    if (UNLIKELY(result.get_size() == 0 || offset >= size())) {
+        *bytes_read = 0;
+        return Status::OK();
+    }
+    size_t nbytes = result.get_size();
+    int actual_bytes_read = 0;
+    while (actual_bytes_read < nbytes && offset < size()) {
+        size_t read_num = 0;
+        auto buffer_pos = get_buffer_pos(offset);
+        RETURN_IF_ERROR(
+                _pre_buffers[buffer_pos]->read_buffer(offset, 
result.get_data() + actual_bytes_read,
+                                                      nbytes - 
actual_bytes_read, &read_num));
+        actual_bytes_read += read_num;
+        offset += read_num;
+    }
+    *bytes_read = actual_bytes_read;
+    return Status::OK();
+}
+
+Status PrefetchBufferedReader::close() {
+    std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
+                  [](std::shared_ptr<PrefetchBuffer>& buffer) { 
buffer->close(); });
+    _reader->close();
+    _closed = true;
+
+    return Status::OK();
+}
+
 BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, 
uint64_t offset,
                                                    uint64_t length, size_t 
max_buf_size)
         : _file(file),
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 8b06721c1d..2b3b6054b9 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -19,8 +19,10 @@
 
 #include <stdint.h>
 
+#include <condition_variable>
 #include <memory>
 
+#include "common/config.h"
 #include "common/status.h"
 #include "io/fs/file_reader.h"
 #include "olap/olap_define.h"
@@ -29,6 +31,114 @@
 namespace doris {
 namespace io {
 
+class PrefetchBufferedReader;
+struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> {
+    enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
+    PrefetchBuffer() = default;
+    PrefetchBuffer(size_t start_offset, size_t end_offset, size_t buffer_size,
+                   size_t whole_buffer_size, io::FileReader* reader)
+            : _start_offset(start_offset),
+              _end_offset(end_offset),
+              _size(buffer_size),
+              _whole_buffer_size(whole_buffer_size),
+              _reader(reader),
+              _buf(buffer_size, '0') {}
+    PrefetchBuffer(PrefetchBuffer&& other)
+            : _offset(other._offset),
+              _start_offset(other._start_offset),
+              _end_offset(other._end_offset),
+              _size(other._size),
+              _whole_buffer_size(other._whole_buffer_size),
+              _reader(other._reader),
+              _buf(std::move(other._buf)) {}
+    ~PrefetchBuffer() = default;
+    size_t _offset;
+    // [_start_offset, _end_offset) is the range that can be prefetched.
+    // Notice that the reader can read out of [_start_offset, _end_offset), 
because FE does not align the file
+    // according to the format when splitting it.
+    size_t _start_offset;
+    size_t _end_offset;
+    size_t _size;
+    size_t _len {0};
+    size_t _whole_buffer_size;
+    io::FileReader* _reader;
+    std::string _buf;
+    BufferStatus _buffer_status {BufferStatus::RESET};
+    std::mutex _lock;
+    std::condition_variable _prefetched;
+    Status _prefetch_status {Status::OK()};
+    // @brief: reset the start offset of this buffer to offset
+    // @param: the new start offset for this buffer
+    void reset_offset(size_t offset);
+    // @brief: start to fetch the content between [_offset, _offset + _size)
+    void prefetch_buffer();
+    // @brief: used by BufferedReader to read the prefetched data
+    // @param[off] read start address
+    // @param[buf] buffer to put the actual content
+    // @param[buf_len] maximum len trying to read
+    // @param[bytes_read] actual bytes read
+    Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* 
bytes_read);
+    // @brief: shut down the buffer until the prior prefetching task is done
+    void close();
+    // @brief: to detect whether this buffer contains off
+    // @param[off] detect offset
+    bool inline contains(size_t off) const { return _offset <= off && off < 
_offset + _size; }
+};
+
+/**
+ * A buffered reader that prefetch data in the daemon thread pool.
+ * The data is prefetched sequentially until the underlying buffers(4 * 4M as 
default) are full.
+ * When a buffer is read out, it will fetch data backward in daemon, so the 
underlying reader should be
+ * thread-safe, and the access mode of data needs to be sequential.
+ * Therefore, PrefetchBufferedReader now only support csv&json format when 
reading s3&broker file.
+ */
+class PrefetchBufferedReader : public io::FileReader {
+public:
+    PrefetchBufferedReader(io::FileReaderSPtr reader, int64_t offset, int64_t 
length,
+                           int64_t buffer_size = -1L);
+    ~PrefetchBufferedReader() override;
+
+    Status close() override;
+
+    const io::Path& path() const override { return _reader->path(); }
+
+    size_t size() const override { return _size; }
+
+    bool closed() const override { return _closed; }
+
+    std::shared_ptr<io::FileSystem> fs() const override { return 
_reader->fs(); }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const IOContext* io_ctx) override;
+
+private:
+    size_t get_buffer_pos(int64_t position) const {
+        return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size;
+    }
+    size_t get_buffer_offset(int64_t position) const {
+        return (position / s_max_pre_buffer_size) * s_max_pre_buffer_size;
+    }
+    void reset_all_buffer(size_t position) {
+        for (int64_t i = 0; i < _pre_buffers.size(); i++) {
+            int64_t cur_pos = position + i * s_max_pre_buffer_size;
+            int cur_buf_pos = get_buffer_pos(cur_pos);
+            // reset would do all the prefetch work
+            
_pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos));
+        }
+    }
+
+    io::FileReaderSPtr _reader;
+    int64_t _start_offset;
+    int64_t _end_offset;
+    int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB
+    std::vector<std::shared_ptr<PrefetchBuffer>> _pre_buffers;
+    int64_t _whole_pre_buffer_size;
+    bool _initialized = false;
+    bool _closed = false;
+    size_t _size;
+};
+
 /**
  * Load all the needed data in underlying buffer, so the caller does not need 
to prepare the data container.
  */
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4ce64ff153..5483f5496c 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -125,6 +125,9 @@ public:
     MemTrackerLimiter* experimental_mem_tracker() { return 
_experimental_mem_tracker.get(); }
     ThreadPool* send_batch_thread_pool() { return 
_send_batch_thread_pool.get(); }
     ThreadPool* download_cache_thread_pool() { return 
_download_cache_thread_pool.get(); }
+    ThreadPool* buffered_reader_prefetch_thread_pool() {
+        return _buffered_reader_prefetch_thread_pool.get();
+    }
     ThreadPool* send_report_thread_pool() { return 
_send_report_thread_pool.get(); }
     ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); 
}
 
@@ -215,6 +218,8 @@ private:
 
     // Threadpool used to download cache from remote storage
     std::unique_ptr<ThreadPool> _download_cache_thread_pool;
+    // Threadpool used to prefetch remote file for buffered reader
+    std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
     // A token used to submit download cache task serially
     std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
     // Pool used by fragment manager to send profile or status to FE 
coordinator
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 4287ed8034..b7066bbdbe 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -100,6 +100,11 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
 
     init_download_cache_required_components();
 
+    ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
+            .set_min_threads(16)
+            .set_max_threads(64)
+            .build(&_buffered_reader_prefetch_thread_pool);
+
     // min num equal to fragment pool's min num
     // max num is useless because it will start as many as requested in the 
past
     // queue size is useless because the max thread num is very large
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 65d1fb355b..f11a5c2273 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -26,6 +26,9 @@
 #include "exec/text_converter.h"
 #include "exec/text_converter.hpp"
 #include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
+#include "io/fs/buffered_reader.h"
+#include "io/fs/s3_file_reader.h"
 #include "olap/iterators.h"
 #include "olap/olap_common.h"
 #include "util/string_util.h"
@@ -135,11 +138,20 @@ Status CsvReader::init_reader(bool is_load) {
 
     _file_description.start_offset = start_offset;
 
+    io::FileReaderSPtr csv_file_reader;
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader));
+        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&csv_file_reader));
     } else {
         RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, _system_properties, _file_description, 
&_file_system, &_file_reader));
+                _profile, _system_properties, _file_description, 
&_file_system, &csv_file_reader));
+    }
+    if (typeid_cast<io::S3FileReader*>(csv_file_reader.get()) != nullptr ||
+        typeid_cast<io::BrokerFileReader*>(csv_file_reader.get()) != nullptr) {
+        // PrefetchBufferedReader now only support csv&json format when 
reading s3&broker file
+        _file_reader.reset(
+                new io::PrefetchBufferedReader(csv_file_reader, 
_range.start_offset, _range.size));
+    } else {
+        _file_reader = std::move(csv_file_reader);
     }
     if (_file_reader->size() == 0 && _params.file_type != 
TFileType::FILE_STREAM &&
         _params.file_type != TFileType::FILE_BROKER) {
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index d34b9ee5c8..3984e68f33 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -27,7 +27,8 @@
 //  larger than 300B for correct lzo header decompressing
 #define INPUT_CHUNK (2 * 1024 * 1024)
 // #define INPUT_CHUNK  (34)
-#define OUTPUT_CHUNK (8 * 1024 * 1024)
+// align with prefetch buffer size
+#define OUTPUT_CHUNK (4 * 1024 * 1024)
 // #define OUTPUT_CHUNK (32)
 // leave these 2 size small for debugging
 
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index b28a97cfdf..6ae0c74e85 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -20,6 +20,9 @@
 #include "common/compiler_util.h"
 #include "exprs/json_functions.h"
 #include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
+#include "io/fs/buffered_reader.h"
+#include "io/fs/s3_file_reader.h"
 #include "io/fs/stream_load_pipe.h"
 #include "olap/iterators.h"
 #include "runtime/descriptors.h"
@@ -332,11 +335,20 @@ Status NewJsonReader::_open_file_reader() {
     _current_offset = start_offset;
     _file_description.start_offset = start_offset;
 
+    io::FileReaderSPtr json_file_reader;
     if (_params.file_type == TFileType::FILE_STREAM) {
-        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&_file_reader));
+        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, 
&json_file_reader));
     } else {
         RETURN_IF_ERROR(FileFactory::create_file_reader(
-                _profile, _system_properties, _file_description, 
&_file_system, &_file_reader));
+                _profile, _system_properties, _file_description, 
&_file_system, &json_file_reader));
+    }
+    if (typeid_cast<io::S3FileReader*>(json_file_reader.get()) != nullptr ||
+        typeid_cast<io::BrokerFileReader*>(json_file_reader.get()) != nullptr) 
{
+        // PrefetchBufferedReader now only support csv&json format when 
reading s3&broker file
+        _file_reader.reset(
+                new io::PrefetchBufferedReader(json_file_reader, 
_range.start_offset, _range.size));
+    } else {
+        _file_reader = std::move(json_file_reader);
     }
     return Status::OK();
 }
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 1960f8a97c..f8aa2c310e 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1742,8 +1742,11 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true, masterOnly = false)
     public static long file_scan_node_split_num = 128;
 
+    // 0 means use the block size in HDFS/S3 as split size.
+    // HDFS block size is 128MB, while S3 block size is 32MB.
+    // 32MB is too small for a S3 file split, so set 128MB as default split 
size.
     @ConfField(mutable = true, masterOnly = false)
-    public static long file_split_size = 0; // 0 means use the block size in 
HDFS/S3 as split size
+    public static long file_split_size = 134217728;
 
     /**
      * If set to TRUE, FE will:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to