This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 3c81bca5684 [opt](filecache) do not sync segment data into storage
system #25691 (#25856)
3c81bca5684 is described below
commit 3c81bca5684ba9ad1f4c914f61b32f3c7bdf19e4
Author: Mingyu Chen <[email protected]>
AuthorDate: Wed Oct 25 16:49:11 2023 +0800
[opt](filecache) do not sync segment data into storage system #25691
(#25856)
---
be/src/exec/tablet_info.cpp | 10 +++----
be/src/io/cache/block/block_file_segment.cpp | 3 +-
be/src/io/fs/broker_file_system.cpp | 7 +++--
be/src/io/fs/broker_file_system.h | 3 +-
be/src/io/fs/file_system.cpp | 5 ++--
be/src/io/fs/file_system.h | 7 +++--
.../{local_file_writer.h => file_writer_options.h} | 32 ++++------------------
be/src/io/fs/hdfs_file_system.cpp | 5 ++--
be/src/io/fs/hdfs_file_system.h | 3 +-
be/src/io/fs/local_file_system.cpp | 7 +++--
be/src/io/fs/local_file_system.h | 3 +-
be/src/io/fs/local_file_writer.cpp | 6 ++--
be/src/io/fs/local_file_writer.h | 3 +-
be/src/io/fs/s3_file_system.cpp | 3 +-
be/src/io/fs/s3_file_system.h | 3 +-
be/test/olap/tablet_cooldown_test.cpp | 3 +-
16 files changed, 50 insertions(+), 53 deletions(-)
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index fb5719bed4b..8df90973b05 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -93,8 +93,7 @@ Status OlapTableSchemaParam::init(const
POlapTableSchemaParam& pschema) {
if (!_is_partial_update ||
_partial_update_input_columns.count(pcolumn_desc.name()) > 0) {
std::string col_type = has_invalid_type ? "INVALID_TYPE" :
pcolumn_desc.type();
- auto it = slots_map.find(
- std::make_pair(to_lower(pcolumn_desc.name()),
col_type));
+ auto it =
slots_map.find(std::make_pair(to_lower(pcolumn_desc.name()), col_type));
if (it == std::end(slots_map)) {
return Status::InternalError("unknown index column,
column={}, type={}",
pcolumn_desc.name(),
pcolumn_desc.type());
@@ -153,9 +152,10 @@ Status OlapTableSchemaParam::init(const
TOlapTableSchemaParam& tschema) {
index->index_id = t_index.id;
index->schema_hash = t_index.schema_hash;
for (auto& tcolumn_desc : t_index.columns_desc) {
- TPrimitiveType::type col_type = has_invalid_type ?
TPrimitiveType::INVALID_TYPE : tcolumn_desc.column_type.type;
- auto it =
slots_map.find(std::make_pair(to_lower(tcolumn_desc.column_name),
- thrift_to_type(col_type)));
+ TPrimitiveType::type col_type =
+ has_invalid_type ? TPrimitiveType::INVALID_TYPE :
tcolumn_desc.column_type.type;
+ auto it = slots_map.find(
+ std::make_pair(to_lower(tcolumn_desc.column_name),
thrift_to_type(col_type)));
if (!_is_partial_update ||
_partial_update_input_columns.count(tcolumn_desc.column_name)
> 0) {
if (it == slots_map.end()) {
diff --git a/be/src/io/cache/block/block_file_segment.cpp
b/be/src/io/cache/block/block_file_segment.cpp
index 38d230d9bb8..3b3ac6a5eb3 100644
--- a/be/src/io/cache/block/block_file_segment.cpp
+++ b/be/src/io/cache/block/block_file_segment.cpp
@@ -159,7 +159,8 @@ Status FileBlock::append(Slice data) {
Status st = Status::OK();
if (!_cache_writer) {
auto download_path = get_path_in_local_cache();
- st = global_local_filesystem()->create_file(download_path,
&_cache_writer);
+ FileWriterOptions not_sync {.sync_file_data = false};
+ st = global_local_filesystem()->create_file(download_path,
&_cache_writer, ¬_sync);
if (!st) {
_cache_writer.reset();
return st;
diff --git a/be/src/io/fs/broker_file_system.cpp
b/be/src/io/fs/broker_file_system.cpp
index 5a4342027b1..3578fb91eaa 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -95,7 +95,8 @@ Status BrokerFileSystem::connect_impl() {
return status;
}
-Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr*
writer) {
+Status BrokerFileSystem::create_file_impl(const Path& path, FileWriterPtr*
writer,
+ const FileWriterOptions* opts) {
*writer = std::make_unique<BrokerFileWriter>(ExecEnv::GetInstance(),
_broker_addr, _broker_prop,
path, 0 /* offset */,
getSPtr());
return Status::OK();
@@ -356,7 +357,7 @@ Status BrokerFileSystem::upload_impl(const Path&
local_file, const Path& remote_
// NOTICE: broker writer must be closed before calling rename
FileWriterPtr broker_writer = nullptr;
- RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer));
+ RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer, nullptr));
constexpr size_t buf_sz = 1024 * 1024;
char read_buf[buf_sz];
@@ -391,7 +392,7 @@ Status BrokerFileSystem::batch_upload_impl(const
std::vector<Path>& local_files,
Status BrokerFileSystem::direct_upload_impl(const Path& remote_file, const
std::string& content) {
FileWriterPtr broker_writer = nullptr;
- RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer));
+ RETURN_IF_ERROR(create_file_impl(remote_file, &broker_writer, nullptr));
RETURN_IF_ERROR(broker_writer->append({content}));
return broker_writer->close();
}
diff --git a/be/src/io/fs/broker_file_system.h
b/be/src/io/fs/broker_file_system.h
index a015f5c1f53..1e29b10a744 100644
--- a/be/src/io/fs/broker_file_system.h
+++ b/be/src/io/fs/broker_file_system.h
@@ -48,7 +48,8 @@ public:
protected:
Status connect_impl() override;
- Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
+ Status create_file_impl(const Path& file, FileWriterPtr* writer,
+ const FileWriterOptions* opts) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
FileReaderSPtr* reader) override;
Status create_directory_impl(const Path& dir, bool failed_if_exists =
false) override;
diff --git a/be/src/io/fs/file_system.cpp b/be/src/io/fs/file_system.cpp
index 989a68884a6..c5d49d83f51 100644
--- a/be/src/io/fs/file_system.cpp
+++ b/be/src/io/fs/file_system.cpp
@@ -22,9 +22,10 @@
namespace doris {
namespace io {
-Status FileSystem::create_file(const Path& file, FileWriterPtr* writer) {
+Status FileSystem::create_file(const Path& file, FileWriterPtr* writer,
+ const FileWriterOptions* opts) {
auto path = absolute_path(file);
- FILESYSTEM_M(create_file_impl(path, writer));
+ FILESYSTEM_M(create_file_impl(path, writer, opts));
}
Status FileSystem::open_file(const FileDescription& fd, const
FileReaderOptions& reader_options,
diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h
index 04e2615fb0b..4a0ee22e08a 100644
--- a/be/src/io/fs/file_system.h
+++ b/be/src/io/fs/file_system.h
@@ -32,6 +32,7 @@
#include "common/status.h"
#include "io/fs/file_reader_options.h"
#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/file_writer_options.h"
#include "io/fs/fs_utils.h"
#include "io/fs/path.h"
@@ -74,7 +75,8 @@ class FileSystem : public
std::enable_shared_from_this<FileSystem> {
public:
// The following are public interface.
// And derived classes should implement all xxx_impl methods.
- Status create_file(const Path& file, FileWriterPtr* writer);
+ Status create_file(const Path& file, FileWriterPtr* writer,
+ const FileWriterOptions* opts = nullptr);
Status open_file(const Path& file, FileReaderSPtr* reader) {
FileDescription fd;
fd.path = file.native();
@@ -115,7 +117,8 @@ public:
protected:
/// create file and return a FileWriter
- virtual Status create_file_impl(const Path& file, FileWriterPtr* writer) =
0;
+ virtual Status create_file_impl(const Path& file, FileWriterPtr* writer,
+ const FileWriterOptions* opts) = 0;
/// open file and return a FileReader
virtual Status open_file_impl(const FileDescription& fd, const Path&
abs_file,
diff --git a/be/src/io/fs/local_file_writer.h
b/be/src/io/fs/file_writer_options.h
similarity index 57%
copy from be/src/io/fs/local_file_writer.h
copy to be/src/io/fs/file_writer_options.h
index 11b2f16434b..511bd81a168 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/io/fs/file_writer_options.h
@@ -17,35 +17,15 @@
#pragma once
-#include <cstddef>
-
-#include "common/status.h"
-#include "io/fs/file_system.h"
-#include "io/fs/file_writer.h"
-#include "io/fs/path.h"
-#include "util/slice.h"
-
namespace doris {
namespace io {
-class LocalFileWriter final : public FileWriter {
-public:
- LocalFileWriter(Path path, int fd, FileSystemSPtr fs);
- LocalFileWriter(Path path, int fd);
- ~LocalFileWriter() override;
-
- Status close() override;
- Status abort() override;
- Status appendv(const Slice* data, size_t data_cnt) override;
- Status write_at(size_t offset, const Slice& data) override;
- Status finalize() override;
-
-private:
- Status _close(bool sync);
-
-private:
- int _fd; // owned
- bool _dirty = false;
+// Only affects remote file writers
+struct FileWriterOptions {
+ bool write_file_cache = false;
+ bool is_cold_data = false;
+ bool sync_file_data = true; // Whether flush data into storage
system
+ int64_t file_cache_expiration = 0; // Absolute time
};
} // namespace io
diff --git a/be/src/io/fs/hdfs_file_system.cpp
b/be/src/io/fs/hdfs_file_system.cpp
index 22a1516d07d..b8da1d24e61 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -164,7 +164,8 @@ Status HdfsFileSystem::connect_impl() {
return Status::OK();
}
-Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr*
writer) {
+Status HdfsFileSystem::create_file_impl(const Path& file, FileWriterPtr*
writer,
+ const FileWriterOptions* opts) {
*writer = std::make_unique<HdfsFileWriter>(file, getSPtr());
return Status::OK();
}
@@ -315,7 +316,7 @@ Status HdfsFileSystem::upload_impl(const Path& local_file,
const Path& remote_fi
// 2. open remote file for write
FileWriterPtr hdfs_writer = nullptr;
- RETURN_IF_ERROR(create_file_impl(remote_file, &hdfs_writer));
+ RETURN_IF_ERROR(create_file_impl(remote_file, &hdfs_writer, nullptr));
constexpr size_t buf_sz = 1024 * 1024;
char read_buf[buf_sz];
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index 229d236195a..f0a73ebed93 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -122,7 +122,8 @@ public:
protected:
Status connect_impl() override;
- Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
+ Status create_file_impl(const Path& file, FileWriterPtr* writer,
+ const FileWriterOptions* opts) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
FileReaderSPtr* reader) override;
Status create_directory_impl(const Path& dir, bool failed_if_exists =
false) override;
diff --git a/be/src/io/fs/local_file_system.cpp
b/be/src/io/fs/local_file_system.cpp
index 7e9f88c8b21..2574cec3ef4 100644
--- a/be/src/io/fs/local_file_system.cpp
+++ b/be/src/io/fs/local_file_system.cpp
@@ -55,13 +55,16 @@ LocalFileSystem::LocalFileSystem(Path&& root_path,
std::string&& id)
LocalFileSystem::~LocalFileSystem() = default;
-Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr*
writer) {
+Status LocalFileSystem::create_file_impl(const Path& file, FileWriterPtr*
writer,
+ const FileWriterOptions* opts) {
int fd = ::open(file.c_str(), O_TRUNC | O_WRONLY | O_CREAT | O_CLOEXEC,
0666);
if (-1 == fd) {
return Status::IOError("failed to open {}: {}", file.native(),
errno_to_str());
}
+ bool sync_data = opts != nullptr ? opts->sync_file_data : true;
*writer = std::make_unique<LocalFileWriter>(
- std::move(file), fd,
std::static_pointer_cast<LocalFileSystem>(shared_from_this()));
+ std::move(file), fd,
std::static_pointer_cast<LocalFileSystem>(shared_from_this()),
+ sync_data);
return Status::OK();
}
diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h
index 5ede7b9ab91..da55cf0dfba 100644
--- a/be/src/io/fs/local_file_system.h
+++ b/be/src/io/fs/local_file_system.h
@@ -82,7 +82,8 @@ public:
Status safe_glob(const std::string& path, std::vector<FileInfo>* res);
protected:
- Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
+ Status create_file_impl(const Path& file, FileWriterPtr* writer,
+ const FileWriterOptions* opts) override;
Status open_file_impl(const FileDescription& file_desc, const Path&
abs_path,
const FileReaderOptions& reader_options,
FileReaderSPtr* reader) override;
Status create_directory_impl(const Path& dir, bool failed_if_exists =
false) override;
diff --git a/be/src/io/fs/local_file_writer.cpp
b/be/src/io/fs/local_file_writer.cpp
index 62536dd6876..893c6ea9034 100644
--- a/be/src/io/fs/local_file_writer.cpp
+++ b/be/src/io/fs/local_file_writer.cpp
@@ -67,8 +67,8 @@ Status sync_dir(const io::Path& dirname) {
namespace io {
-LocalFileWriter::LocalFileWriter(Path path, int fd, FileSystemSPtr fs)
- : FileWriter(std::move(path), fs), _fd(fd) {
+LocalFileWriter::LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool
sync_data)
+ : FileWriter(std::move(path), fs), _fd(fd), _sync_data(sync_data) {
_opened = true;
DorisMetrics::instance()->local_file_open_writing->increment(1);
DorisMetrics::instance()->local_file_writer_total->increment(1);
@@ -85,7 +85,7 @@ LocalFileWriter::~LocalFileWriter() {
}
Status LocalFileWriter::close() {
- return _close(true);
+ return _close(_sync_data);
}
Status LocalFileWriter::abort() {
diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h
index 11b2f16434b..59329b178c5 100644
--- a/be/src/io/fs/local_file_writer.h
+++ b/be/src/io/fs/local_file_writer.h
@@ -30,7 +30,7 @@ namespace io {
class LocalFileWriter final : public FileWriter {
public:
- LocalFileWriter(Path path, int fd, FileSystemSPtr fs);
+ LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data =
true);
LocalFileWriter(Path path, int fd);
~LocalFileWriter() override;
@@ -46,6 +46,7 @@ private:
private:
int _fd; // owned
bool _dirty = false;
+ const bool _sync_data;
};
} // namespace io
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index ca4fd0bda8b..79f2a324f44 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -128,7 +128,8 @@ Status S3FileSystem::connect_impl() {
return Status::OK();
}
-Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer)
{
+Status S3FileSystem::create_file_impl(const Path& file, FileWriterPtr* writer,
+ const FileWriterOptions* opts) {
GET_KEY(key, file);
*writer = std::make_unique<S3FileWriter>(key, get_client(), _s3_conf,
getSPtr());
return Status::OK();
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index d2570a10588..301d97c73cd 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -68,7 +68,8 @@ public:
protected:
Status connect_impl() override;
- Status create_file_impl(const Path& file, FileWriterPtr* writer) override;
+ Status create_file_impl(const Path& file, FileWriterPtr* writer,
+ const FileWriterOptions* opts) override;
Status open_file_internal(const FileDescription& fd, const Path& abs_path,
FileReaderSPtr* reader) override;
Status create_directory_impl(const Path& dir, bool failed_if_exists =
false) override;
diff --git a/be/test/olap/tablet_cooldown_test.cpp
b/be/test/olap/tablet_cooldown_test.cpp
index c87a09899e1..2582746291a 100644
--- a/be/test/olap/tablet_cooldown_test.cpp
+++ b/be/test/olap/tablet_cooldown_test.cpp
@@ -135,7 +135,8 @@ public:
~RemoteFileSystemMock() override = default;
protected:
- Status create_file_impl(const Path& path, io::FileWriterPtr* writer)
override {
+ Status create_file_impl(const Path& path, io::FileWriterPtr* writer,
+ const io::FileWriterOptions* opts) override {
Path fs_path = path;
*writer = std::make_unique<FileWriterMock>(fs_path);
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]